JBoss hornetq SVN: r11633 - in trunk: hornetq-core/src/main/java/org/hornetq/core/paging/impl and 7 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-02 12:16:50 -0400 (Wed, 02 Nov 2011)
New Revision: 11633
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
trunk/hornetq-core/src/main/javacc/FilterParser.jj
trunk/hornetq-jms/src/main/java/org/hornetq/jms/client/HornetQSession.java
trunk/hornetq-jms/src/main/java/org/hornetq/jms/client/HornetQTopic.java
trunk/hornetq-jms/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java
trunk/hornetq-jms/src/main/java/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
Log:
clean up
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-11-02 16:16:14 UTC (rev 11632)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-11-02 16:16:50 UTC (rev 11633)
@@ -1689,23 +1689,21 @@
super.finalize();
}
- class Connector
+ private class Connector
{
private final TransportConfiguration initialConnector;
private volatile ClientSessionFactoryInternal factory;
- private boolean interrupted = false;
-
private Exception e;
- public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
+ Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
{
this.initialConnector = initialConnector;
this.factory = factory;
}
- public ClientSessionFactory tryConnect() throws HornetQException
+ private ClientSessionFactory tryConnect() throws HornetQException
{
if (log.isDebugEnabled())
{
@@ -1738,8 +1736,6 @@
public void disconnect()
{
- interrupted = true;
-
if (factory != null)
{
factory.causeExit();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-11-02 16:16:14 UTC (rev 11632)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-11-02 16:16:50 UTC (rev 11633)
@@ -28,7 +28,6 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.MessageReference;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
@@ -57,9 +56,9 @@
private volatile boolean rolledback = false;
- private AtomicInteger numberOfMessages = new AtomicInteger(0);
+ private final AtomicInteger numberOfMessages = new AtomicInteger(0);
- private AtomicInteger numberOfPersistentMessages = new AtomicInteger(0);
+ private final AtomicInteger numberOfPersistentMessages = new AtomicInteger(0);
private List<Pair<PageSubscription, PagePosition>> lateDeliveries;
@@ -228,11 +227,6 @@
{
PageTransactionInfoImpl.this.onUpdate(1, storageManager, pagingManager);
}
-
- public List<MessageReference> getRelatedMessageReferences()
- {
- return null;
- }
});
}
@@ -268,6 +262,7 @@
}
}
+ @Override
public String toString()
{
return "PageTransactionInfoImpl(transactionID=" + transactionID +
@@ -324,7 +319,7 @@
static class UpdatePageTXOperation extends TransactionOperationAbstract
{
- private HashMap<PageTransactionInfo, AtomicInteger> countsToUpdate = new HashMap<PageTransactionInfo, AtomicInteger>();
+ private final HashMap<PageTransactionInfo, AtomicInteger> countsToUpdate = new HashMap<PageTransactionInfo, AtomicInteger>();
private boolean stored = false;
@@ -356,16 +351,19 @@
counter.addAndGet(increment);
}
+ @Override
public void beforePrepare(Transaction tx) throws Exception
{
storeUpdates(tx);
}
+ @Override
public void beforeCommit(Transaction tx) throws Exception
{
storeUpdates(tx);
}
+ @Override
public void afterCommit(Transaction tx)
{
for (Map.Entry<PageTransactionInfo, AtomicInteger> entry : countsToUpdate.entrySet())
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-02 16:16:14 UTC (rev 11632)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-02 16:16:50 UTC (rev 11633)
@@ -1215,26 +1215,26 @@
{
// no need to store if it's the same value
// otherwise the journal will get OME in case of lots of redeliveries
- if (ref.getDeliveryCount() != ref.getPersistedCount())
+ if (ref.getDeliveryCount() == ref.getPersistedCount())
{
- ref.setPersistedCount(ref.getDeliveryCount());
- DeliveryCountUpdateEncoding updateInfo =
- new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());
+ return;
+ }
+ ref.setPersistedCount(ref.getDeliveryCount());
+ DeliveryCountUpdateEncoding updateInfo =
+ new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());
+
readLock();
try
{
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
JournalStorageManager.UPDATE_DELIVERY_COUNT, updateInfo,
-
syncNonTransactional, getContext(syncNonTransactional));
}
-
finally
{
readUnLock();
}
- }
}
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
@@ -2581,13 +2581,6 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.persistence.OperationContext#complete()
- */
- public void complete()
- {
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.persistence.OperationContext#executeOnCompletion(org.hornetq.core.journal.IOAsyncTask)
*/
public void executeOnCompletion(final IOAsyncTask runnable)
@@ -3039,11 +3032,6 @@
public long id;
- public DeleteEncoding()
- {
- super();
- }
-
public DeleteEncoding(final byte recordType, final long id)
{
this.recordType = recordType;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-11-02 16:16:14 UTC (rev 11632)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-11-02 16:16:50 UTC (rev 11633)
@@ -18,10 +18,7 @@
package org.hornetq.core.protocol.stomp;
import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-02 16:16:14 UTC (rev 11632)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-02 16:16:50 UTC (rev 11633)
@@ -18,13 +18,11 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.Executor;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
@@ -253,7 +251,7 @@
Iterator<Entry<String, StompSession>> iterator = transactedSessions.entrySet().iterator();
while (iterator.hasNext())
{
- Map.Entry<String, StompSession> entry = (Map.Entry<String, StompSession>)iterator.next();
+ Map.Entry<String, StompSession> entry = iterator.next();
if (entry.getValue().getConnection() == connection)
{
ServerSession serverSession = entry.getValue().getSession();
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-11-02 16:16:14 UTC (rev 11632)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-11-02 16:16:50 UTC (rev 11633)
@@ -12,21 +12,14 @@
*/
package org.hornetq.core.protocol.stomp;
-import java.io.UnsupportedEncodingException;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.Message;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.OperationContext;
-import org.hornetq.core.protocol.stomp.Stomp.Headers;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.QueueQueryResult;
import org.hornetq.core.server.ServerMessage;
@@ -35,7 +28,6 @@
import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.spi.core.remoting.ReadyListener;
import org.hornetq.utils.ConfigurationHelper;
-import org.hornetq.utils.DataConstants;
import org.hornetq.utils.UUIDGenerator;
/**
@@ -45,8 +37,6 @@
*/
public class StompSession implements SessionCallback
{
- private static final Logger log = Logger.getLogger(StompSession.class);
-
private final StompProtocolManager manager;
private final StompConnection connection;
@@ -239,7 +229,7 @@
Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
while (iterator.hasNext())
{
- Map.Entry<Long, StompSubscription> entry = (Map.Entry<Long, StompSubscription>)iterator.next();
+ Map.Entry<Long, StompSubscription> entry = iterator.next();
long consumerID = entry.getKey();
StompSubscription sub = entry.getValue();
if (id != null && id.equals(sub.getID()))
@@ -257,7 +247,7 @@
Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
while (iterator.hasNext())
{
- Map.Entry<Long, StompSubscription> entry = (Map.Entry<Long, StompSubscription>)iterator.next();
+ Map.Entry<Long, StompSubscription> entry = iterator.next();
StompSubscription sub = entry.getValue();
if (sub.getID().equals(subscriptionID))
{
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-11-02 16:16:14 UTC (rev 11632)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-11-02 16:16:50 UTC (rev 11633)
@@ -18,11 +18,8 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.core.protocol.stomp.HornetQStompException;
-import org.hornetq.core.protocol.stomp.SimpleBytes;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompFrame;
-import org.hornetq.core.protocol.stomp.StompFrame.Header;
/**
*
@@ -31,7 +28,7 @@
public class StompFrameV11 extends StompFrame
{
//stomp 1.1 talks about repetitive headers.
- private List<Header> allHeaders = new ArrayList<Header>();
+ private final List<Header> allHeaders = new ArrayList<Header>();
private String contentType;
public StompFrameV11(String command, Map<String, String> headers, byte[] content)
Modified: trunk/hornetq-core/src/main/javacc/FilterParser.jj
===================================================================
--- trunk/hornetq-core/src/main/javacc/FilterParser.jj 2011-11-02 16:16:14 UTC (rev 11632)
+++ trunk/hornetq-core/src/main/javacc/FilterParser.jj 2011-11-02 16:16:50 UTC (rev 11633)
@@ -35,6 +35,7 @@
import org.hornetq.core.filter.impl.SimpleStringReader;
// CHECKSTYLE:OFF
+// NO_UCD
/**
* A JavaCC 2.0 grammar for HornetQ filters
*
Modified: trunk/hornetq-jms/src/main/java/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- trunk/hornetq-jms/src/main/java/org/hornetq/jms/client/HornetQSession.java 2011-11-02 16:16:14 UTC (rev 11632)
+++ trunk/hornetq-jms/src/main/java/org/hornetq/jms/client/HornetQSession.java 2011-11-02 16:16:50 UTC (rev 11633)
@@ -45,9 +45,6 @@
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.TransactionInProgressException;
-import javax.jms.XAQueueSession;
-import javax.jms.XASession;
-import javax.jms.XATopicSession;
import javax.transaction.xa.XAResource;
import org.hornetq.api.core.HornetQException;
Modified: trunk/hornetq-jms/src/main/java/org/hornetq/jms/client/HornetQTopic.java
===================================================================
--- trunk/hornetq-jms/src/main/java/org/hornetq/jms/client/HornetQTopic.java 2011-11-02 16:16:14 UTC (rev 11632)
+++ trunk/hornetq-jms/src/main/java/org/hornetq/jms/client/HornetQTopic.java 2011-11-02 16:16:50 UTC (rev 11633)
@@ -13,12 +13,9 @@
package org.hornetq.jms.client;
-import javax.jms.JMSException;
import javax.jms.Topic;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.jms.client.HornetQDestination;
/**
* HornetQ implementation of a JMS Topic.
@@ -40,7 +37,7 @@
public static SimpleString createAddressFromName(final String name)
{
- return new SimpleString(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX + name);
+ return new SimpleString(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX + name);
}
// Attributes ----------------------------------------------------
@@ -49,7 +46,7 @@
public HornetQTopic(final String name)
{
- super(HornetQQueue.JMS_TOPIC_ADDRESS_PREFIX + name, name, false, false, null);
+ super(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX + name, name, false, false, null);
}
Modified: trunk/hornetq-jms/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/hornetq-jms/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2011-11-02 16:16:14 UTC (rev 11632)
+++ trunk/hornetq-jms/src/main/java/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2011-11-02 16:16:50 UTC (rev 11633)
@@ -44,7 +44,6 @@
import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerSession;
import org.hornetq.jms.client.HornetQDestination;
-import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
@@ -105,24 +104,24 @@
private static String[] determineJMSDestination(String coreAddress)
{
String[] result = new String[2]; // destination name & type
- if (coreAddress.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
+ if (coreAddress.startsWith(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX))
{
- result[0] = coreAddress.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length());
+ result[0] = coreAddress.substring(HornetQDestination.JMS_QUEUE_ADDRESS_PREFIX.length());
result[1] = "queue";
}
- else if (coreAddress.startsWith(HornetQQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
+ else if (coreAddress.startsWith(HornetQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
{
- result[0] = coreAddress.substring(HornetQQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length());
+ result[0] = coreAddress.substring(HornetQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length());
result[1] = "tempqueue";
}
- else if (coreAddress.startsWith(HornetQQueue.JMS_TOPIC_ADDRESS_PREFIX))
+ else if (coreAddress.startsWith(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX))
{
- result[0] = coreAddress.substring(HornetQQueue.JMS_TOPIC_ADDRESS_PREFIX.length());
+ result[0] = coreAddress.substring(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX.length());
result[1] = "topic";
}
- else if (coreAddress.startsWith(HornetQQueue.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
+ else if (coreAddress.startsWith(HornetQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
{
- result[0] = coreAddress.substring(HornetQQueue.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length());
+ result[0] = coreAddress.substring(HornetQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length());
result[1] = "temptopic";
}
else
@@ -914,44 +913,29 @@
CONNECTION_FACTORY_DESTROYED;
}
- private static List<String> toList(final String commaSeparatedString)
- {
- List<String> list = new ArrayList<String>();
- if (commaSeparatedString == null || commaSeparatedString.trim().length() == 0)
- {
- return list;
- }
- String[] values = commaSeparatedString.split(",");
- for (int i = 0; i < values.length; i++)
- {
- list.add(values[i].trim());
- }
- return list;
- }
-
public String[] listTargetDestinations(String sessionID) throws Exception
{
String[] addresses = server.getHornetQServer().getHornetQServerControl().listTargetAddresses(sessionID);
Map<String, DestinationControl> allDests = new HashMap<String, DestinationControl>();
Object[] queueControls = server.getHornetQServer().getManagementService().getResources(JMSQueueControl.class);
- for (int i = 0; i < queueControls.length; i++)
+ for (Object queueControl2 : queueControls)
{
- JMSQueueControl queueControl = (JMSQueueControl)queueControls[i];
+ JMSQueueControl queueControl = (JMSQueueControl)queueControl2;
allDests.put(queueControl.getAddress(), queueControl);
}
Object[] topicControls = server.getHornetQServer().getManagementService().getResources(TopicControl.class);
- for (int i = 0; i < topicControls.length; i++)
+ for (Object topicControl2 : topicControls)
{
- TopicControl topicControl = (TopicControl)topicControls[i];
+ TopicControl topicControl = (TopicControl)topicControl2;
allDests.put(topicControl.getAddress(), topicControl);
}
List<String> destinations = new ArrayList<String>();
- for (int i = 0; i < addresses.length; i++)
+ for (String addresse : addresses)
{
- DestinationControl control = allDests.get(addresses[i]);
+ DestinationControl control = allDests.get(addresse);
if (control != null)
{
destinations.add(control.getAddress());
Modified: trunk/hornetq-jms/src/main/java/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
===================================================================
--- trunk/hornetq-jms/src/main/java/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2011-11-02 16:16:14 UTC (rev 11632)
+++ trunk/hornetq-jms/src/main/java/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2011-11-02 16:16:50 UTC (rev 11633)
@@ -19,12 +19,9 @@
import java.util.ArrayList;
import java.util.List;
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.jms.JMSFactoryType;
-import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.config.impl.Validators;
import org.hornetq.core.logging.Logger;
13 years, 1 month
JBoss hornetq SVN: r11632 - trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-02 12:16:14 -0400 (Wed, 02 Nov 2011)
New Revision: 11632
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
Synchronize replicationLock while clearing pendingTokens
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-11-02 14:41:27 UTC (rev 11631)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-11-02 16:16:14 UTC (rev 11632)
@@ -72,8 +72,6 @@
private final ResponseHandler responseHandler = new ResponseHandler();
- private CoreRemotingConnection replicatingConnection;
-
private final Channel replicatingChannel;
private boolean started;
@@ -167,9 +165,8 @@
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long, boolean)
*/
- public
- void
- appendCommitRecord(final byte journalID, final long txID, boolean sync, final boolean lineUp) throws Exception
+ public void appendCommitRecord(final byte journalID, final long txID, boolean sync, final boolean lineUp)
+ throws Exception
{
if (enabled)
{
@@ -205,7 +202,8 @@
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendPrepareRecord(byte, long, org.hornetq.core.journal.EncodingSupport, boolean)
*/
- public void appendPrepareRecord(final byte journalID, final long txID, final EncodingSupport transactionData) throws Exception
+ public void appendPrepareRecord(final byte journalID, final long txID, final EncodingSupport transactionData)
+ throws Exception
{
if (enabled)
{
@@ -321,6 +319,8 @@
return;
}
+ synchronized (replicationLock)
+ {
enabled = false;
// Complete any pending operations...
@@ -337,7 +337,7 @@
ReplicationManagerImpl.log.warn("Error completing callback on replication manager", e);
}
}
-
+ }
if (replicatingChannel != null)
{
replicatingChannel.close();
13 years, 1 month
JBoss hornetq SVN: r11631 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: server/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-02 10:41:27 -0400 (Wed, 02 Nov 2011)
New Revision: 11631
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Fixing file leak that will affect windows on paging
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-11-02 04:15:29 UTC (rev 11630)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-11-02 14:41:27 UTC (rev 11631)
@@ -114,8 +114,17 @@
for (PagingStore store : reloadedStores)
{
- store.start();
- stores.put(store.getStoreName(), store);
+ PagingStore oldStore = stores.get(store.getStoreName());
+ if (oldStore != null)
+ {
+ oldStore.stop();
+ oldStore.start();
+ }
+ else
+ {
+ store.start();
+ stores.put(store.getStoreName(), store);
+ }
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-11-02 04:15:29 UTC (rev 11630)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-11-02 14:41:27 UTC (rev 11631)
@@ -426,6 +426,10 @@
{
currentPageId = 0;
+ if (currentPage != null)
+ {
+ currentPage.close();
+ }
currentPage = null;
List<String> files = fileFactory.listFiles("page");
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-02 04:15:29 UTC (rev 11630)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-02 14:41:27 UTC (rev 11631)
@@ -317,6 +317,7 @@
public synchronized void start() throws Exception
{
+ log.debug("Starting server " + this);
OperationContextImpl.clearContext();
try
13 years, 1 month
JBoss hornetq SVN: r11630 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-02 00:15:29 -0400 (Wed, 02 Nov 2011)
New Revision: 11630
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Fixing windows test
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-11-02 03:05:57 UTC (rev 11629)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-11-02 04:15:29 UTC (rev 11630)
@@ -307,6 +307,21 @@
{
return otherPage.getPageId() - this.pageId;
}
+
+ public void finalize()
+ {
+ try
+ {
+ if (file != null && file.isOpen())
+ {
+ file.close();
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
/* (non-Javadoc)
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-11-02 03:05:57 UTC (rev 11629)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-11-02 04:15:29 UTC (rev 11630)
@@ -74,6 +74,22 @@
return false;
}
+ /**
+ *
+ */
+ public LargeMessageTest()
+ {
+ super();
+ }
+
+ /**
+ * @param test
+ */
+ public LargeMessageTest(String test)
+ {
+ super(test);
+ }
+
public void testRollbackPartiallyConsumedBuffer() throws Exception
{
for (int i = 0 ; i < 1; i++)
@@ -1713,6 +1729,7 @@
100);
}
+
public void testPageOnLargeMessage() throws Exception
{
testPageOnLargeMessage(true, false);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2011-11-02 03:05:57 UTC (rev 11629)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2011-11-02 04:15:29 UTC (rev 11630)
@@ -69,7 +69,18 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+
+ public LargeMessageTestBase(String test)
+ {
+ super(test);
+ }
+
+ public LargeMessageTestBase()
+ {
+ super();
+ }
+
@Override
protected void tearDown() throws Exception
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-11-02 03:05:57 UTC (rev 11629)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-11-02 04:15:29 UTC (rev 11630)
@@ -111,7 +111,7 @@
// Attributes ----------------------------------------------------
- private static final String testDir = System.getProperty("java.io.tmpdir", "/tmp") + "/hornetq-unit-test";
+ private static final String testDir = System.getProperty("java.io.tmpdir", "/tmp") + File.separator + "hornetq-unit-test";
// There is a verification about thread leakages. We only fail a single thread when this happens
private static Set<Thread> alreadyFailedThread = new HashSet<Thread>();
@@ -1168,10 +1168,20 @@
for (int j = 0; j < files.length; j++)
{
- if (!deleteDirectory(new File(directory, files[j])))
+ try
{
- return false;
+
+ File fileTmp = new File(directory, files[j]);
+ if (!deleteDirectory(fileTmp))
+ {
+ log.warn("Couldn't delete " + fileTmp);
+ return false;
+ }
}
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ }
}
}
13 years, 1 month
JBoss hornetq SVN: r11629 - in trunk: hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11 and 2 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-11-01 23:05:57 -0400 (Tue, 01 Nov 2011)
New Revision: 11629
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
Fix Stomp heart-beat issue:
It should send a 'new line' byte rather than a STOMP frame, as per spec
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-11-02 03:05:57 UTC (rev 11629)
@@ -58,6 +58,8 @@
protected boolean disconnect;
+ protected boolean isPing;
+
public StompFrame(String command)
{
this(command, false);
@@ -109,6 +111,16 @@
out += body;
return out;
}
+
+ public boolean isPing()
+ {
+ return isPing;
+ }
+
+ public void setPing(boolean ping)
+ {
+ isPing = ping;
+ }
public HornetQBuffer toHornetQBuffer() throws Exception
{
@@ -123,6 +135,12 @@
buffer = HornetQBuffers.dynamicBuffer(512);
}
+ if (isPing())
+ {
+ buffer.writeByte((byte)10);
+ return buffer;
+ }
+
StringBuffer head = new StringBuffer();
head.append(command);
head.append(Stomp.NEWLINE);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-02 03:05:57 UTC (rev 11629)
@@ -126,6 +126,7 @@
{
response = new HornetQStompException("Encoding error.", e).getFrame();
}
+
return response;
}
@@ -193,7 +194,7 @@
@Override
public StompFrame onSend(StompFrame frame)
- {
+ {
StompFrame response = null;
try
{
@@ -492,7 +493,7 @@
public StompFrame createPingFrame() throws UnsupportedEncodingException
{
StompFrame frame = new StompFrame(Stomp.Commands.STOMP);
- frame.setBody("\n");
+ frame.setPing(true);
return frame;
}
@@ -658,31 +659,37 @@
buffer.readBytes(decoder.workingBuffer, decoder.data, readable);
decoder.data += readable;
-
+
if (decoder.command == null)
{
- if (decoder.data < 4)
+ int offset = 0;
+
+ //check for ping
+ while (decoder.workingBuffer[offset] == StompDecoder.NEW_LINE)
{
- // Need at least four bytes to identify the command
- // - up to 3 bytes for the command name + potentially another byte for a leading \n
-
- return null;
- }
-
- int offset;
-
- if (decoder.workingBuffer[0] == StompDecoder.NEW_LINE)
- {
+ if (heartBeater != null)
+ {
+ //client ping
+ heartBeater.pingAccepted();
+ }
// Yuck, some badly behaved STOMP clients add a \n *after* the terminating NUL char at the end of the
// STOMP
// frame this can manifest as an extra \n at the beginning when the next STOMP frame is read - we need to
// deal
// with this
- offset = 1;
+ offset++;
+ if (offset >= decoder.data)
+ {
+ decoder.data = 0;
+ return null;
+ }
}
- else
+
+ if (decoder.data < 4)
{
- offset = 0;
+ // Need at least four bytes to identify the command
+ // - up to 3 bytes for the command name + potentially another byte for a leading \n
+ return null;
}
byte b = decoder.workingBuffer[offset];
@@ -1025,7 +1032,7 @@
}
// Now the body
-
+
byte[] content = null;
if (decoder.contentLength != -1)
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-11-02 03:05:57 UTC (rev 11629)
@@ -25,7 +25,7 @@
* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
*
*/
-public class AbstractClientStompFrame implements ClientStompFrame
+public abstract class AbstractClientStompFrame implements ClientStompFrame
{
protected static final String HEADER_RECEIPT = "receipt";
@@ -56,6 +56,13 @@
@Override
public ByteBuffer toByteBuffer() throws UnsupportedEncodingException
{
+ if (isPing())
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(1);
+ buffer.put((byte)0x0A);
+ buffer.rewind();
+ return buffer;
+ }
StringBuffer sb = new StringBuffer();
sb.append(command + "\n");
int n = headers.size();
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-11-02 03:05:57 UTC (rev 11629)
@@ -177,8 +177,8 @@
if (validateFrame(frame))
{
- frameQueue.offer(frame);
- receiveList.clear();
+ frameQueue.offer(frame);
+ receiveList.clear();
}
else
{
@@ -188,13 +188,25 @@
}
else
{
- receiveList.add(b);
+ if (b == 10 && receiveList.size() == 0)
+ {
+ //may be a ping
+ incrementServerPing();
+ }
+ else
+ {
+ receiveList.add(b);
+ }
}
}
//clear readbuffer
readBuffer.rewind();
}
+ protected void incrementServerPing()
+ {
+ }
+
private boolean validateFrame(ClientStompFrame f) throws UnsupportedEncodingException
{
String h = f.getHeader("content-length");
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java 2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java 2011-11-02 03:05:57 UTC (rev 11629)
@@ -42,4 +42,6 @@
public ByteBuffer toByteBufferWithExtra(String str) throws UnsupportedEncodingException;
+ public boolean isPing();
+
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java 2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java 2011-11-02 03:05:57 UTC (rev 11629)
@@ -25,5 +25,11 @@
{
super(command);
}
+
+ @Override
+ public boolean isPing()
+ {
+ return false;
+ }
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java 2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java 2011-11-02 03:05:57 UTC (rev 11629)
@@ -21,6 +21,7 @@
public class ClientStompFrameV11 extends AbstractClientStompFrame
{
boolean forceOneway = false;
+ boolean isPing = false;
public ClientStompFrameV11(String command)
{
@@ -43,4 +44,14 @@
}
return false;
}
+
+ public void setPing(boolean b)
+ {
+ isPing = b;
+ }
+
+ public boolean isPing()
+ {
+ return isPing;
+ }
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-11-02 03:05:57 UTC (rev 11629)
@@ -52,6 +52,8 @@
void destroy();
ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException, InterruptedException;
+
+ int getServerPingNumber();
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-11-02 03:05:57 UTC (rev 11629)
@@ -88,14 +88,16 @@
@Override
public void startPinger(long interval)
{
- // TODO Auto-generated method stub
-
}
@Override
public void stopPinger()
{
- // TODO Auto-generated method stub
-
}
+
+ @Override
+ public int getServerPingNumber()
+ {
+ return 0;
+ }
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-11-02 03:05:57 UTC (rev 11629)
@@ -29,6 +29,7 @@
public static final String RECEIPT_HEADER = "receipt";
private Pinger pinger;
+ private volatile int serverPingCounter;
public StompClientConnectionV11(String host, int port) throws IOException
{
@@ -126,6 +127,7 @@
public void disconnect() throws IOException, InterruptedException
{
stopPinger();
+
ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
frame.addHeader("receipt", "1");
@@ -184,6 +186,7 @@
pingFrame = (ClientStompFrameV11) createFrame("STOMP");
pingFrame.setBody("\n");
pingFrame.setForceOneway();
+ pingFrame.setPing(true);
}
public void startPing()
@@ -205,12 +208,8 @@
{
try
{
- System.out.println("============sending ping");
-
sendFrame(pingFrame);
- System.out.println("Pinged " + pingFrame);
-
this.wait(pingInterval);
}
catch (Exception e)
@@ -219,9 +218,20 @@
e.printStackTrace();
}
}
- System.out.println("Pinger stopped");
}
}
}
+ @Override
+ public int getServerPingNumber()
+ {
+ return serverPingCounter;
+ }
+
+ protected void incrementServerPing()
+ {
+ serverPingCounter++;
+ }
+
+
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-11-02 03:05:57 UTC (rev 11629)
@@ -55,6 +55,7 @@
protected void tearDown() throws Exception
{
+ System.out.println("Connection 11 : " + connV11.isConnected());
if (connV11.isConnected())
{
connV11.disconnect();
@@ -238,6 +239,7 @@
//unsub
ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
newConn.disconnect();
}
@@ -491,9 +493,8 @@
ClientStompFrame reply = connV11.sendFrame(frame);
assertEquals("CONNECTED", reply.getCommand());
-
assertEquals("500,500", reply.getHeader("heart-beat"));
-
+
connV11.disconnect();
//heart-beat (500,1000)
@@ -518,7 +519,7 @@
Thread.sleep(10000);
//now check the frame size
- int size = connV11.getFrameQueueSize();
+ int size = connV11.getServerPingNumber();
System.out.println("ping received: " + size);
@@ -533,12 +534,211 @@
//send will be ok
connV11.sendFrame(frame);
- connV11.stopPinger();
-
connV11.disconnect();
+ }
+
+ public void testSendWithHeartBeatsAndReceive() throws Exception
+ {
+ StompClientConnection newConn = null;
+ try
+ {
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+ connV11.sendFrame(frame);
+
+ connV11.startPinger(500);
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+
+ for (int i = 0; i < 10; i++)
+ {
+ frame.setBody("Hello World " + i + "!");
+ connV11.sendFrame(frame);
+ Thread.sleep(500);
+ }
+
+ // subscribe
+ newConn = StompClientConnectionFactory.createClientConnection("1.1",
+ hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ int cnt = 0;
+
+ frame = newConn.receiveFrame();
+
+ while (frame != null)
+ {
+ cnt++;
+ Thread.sleep(500);
+ frame = newConn.receiveFrame(5000);
+ }
+
+ assertEquals(10, cnt);
+
+ // unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ if (newConn != null)
+ newConn.disconnect();
+ connV11.disconnect();
+ }
}
+ public void testSendAndReceiveWithHeartBeats() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+
+ for (int i = 0; i < 10; i++)
+ {
+ frame.setBody("Hello World " + i + "!");
+ connV11.sendFrame(frame);
+ Thread.sleep(500);
+ }
+
+ //subscribe
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ try
+ {
+ frame = newConn.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ newConn.sendFrame(frame);
+
+ newConn.startPinger(500);
+
+ Thread.sleep(500);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ int cnt = 0;
+
+ frame = newConn.receiveFrame();
+
+ while (frame != null)
+ {
+ cnt++;
+ Thread.sleep(500);
+ frame = newConn.receiveFrame(5000);
+ }
+
+ assertEquals(10, cnt);
+
+ // unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ newConn.disconnect();
+ }
+ }
+
+ public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception
+ {
+ StompClientConnection newConn = null;
+ try
+ {
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ connV11.sendFrame(frame);
+
+ connV11.startPinger(500);
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+
+ for (int i = 0; i < 10; i++)
+ {
+ frame.setBody("Hello World " + i + "!");
+ connV11.sendFrame(frame);
+ Thread.sleep(500);
+ }
+
+ // subscribe
+ newConn = StompClientConnectionFactory.createClientConnection("1.1",
+ hostname, port);
+ frame = newConn.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ newConn.sendFrame(frame);
+
+ newConn.startPinger(500);
+
+ Thread.sleep(500);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ int cnt = 0;
+
+ frame = newConn.receiveFrame();
+
+ while (frame != null)
+ {
+ cnt++;
+ Thread.sleep(500);
+ frame = newConn.receiveFrame(5000);
+ }
+ assertEquals(10, cnt);
+
+ // unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ if (newConn != null)
+ newConn.disconnect();
+ connV11.disconnect();
+ }
+ }
+
public void testNack() throws Exception
{
connV11.connect(defUser, defPass);
13 years, 1 month
JBoss hornetq SVN: r11628 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-01 21:16:08 -0400 (Tue, 01 Nov 2011)
New Revision: 11628
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
Setting ClassLoader on MessageConsumers
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-11-01 23:56:50 UTC (rev 11627)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-11-02 01:16:08 UTC (rev 11628)
@@ -14,6 +14,8 @@
package org.hornetq.core.client.impl;
import java.io.File;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.Iterator;
import java.util.concurrent.Executor;
@@ -116,6 +118,8 @@
private final SessionQueueQueryResponseMessage queueInfo;
private volatile boolean ackIndividually;
+
+ private final ClassLoader contextClassLoader;
// Constructors
// ---------------------------------------------------------------------------------
@@ -130,7 +134,8 @@
final TokenBucketLimiter rateLimiter,
final Executor executor,
final Channel channel,
- final SessionQueueQueryResponseMessage queueInfo)
+ final SessionQueueQueryResponseMessage queueInfo,
+ final ClassLoader contextClassLoader)
{
this.id = id;
@@ -153,6 +158,8 @@
this.ackBatchSize = ackBatchSize;
this.queueInfo = queueInfo;
+
+ this.contextClassLoader = contextClassLoader;
}
// ClientConsumer implementation
@@ -861,7 +868,7 @@
{
return;
}
-
+
session.workDone();
// We pull the message from the buffer from inside the Runnable so we can ensure priority
@@ -894,6 +901,8 @@
return;
}
+
+
boolean expired = message.isExpired();
flowControlBeforeConsumption(message);
@@ -906,8 +915,33 @@
{
ClientConsumerImpl.log.trace("Calling handler.onMessage");
}
- theHandler.onMessage(message);
+ final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+ {
+ public ClassLoader run()
+ {
+ ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+
+ return originalLoader;
+ }
+ });
+ try
+ {
+ theHandler.onMessage(message);
+ }
+ finally
+ {
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ public Object run()
+ {
+ Thread.currentThread().setContextClassLoader(originalLoader);
+ return null;
+ }
+ });
+ }
+
if (ClientConsumerImpl.trace)
{
ClientConsumerImpl.log.trace("Handler.onMessage done");
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-01 23:56:50 UTC (rev 11627)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-02 01:16:08 UTC (rev 11628)
@@ -12,6 +12,8 @@
*/
package org.hornetq.core.client.impl;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -82,6 +84,7 @@
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.remoting.Connection;
+import org.hornetq.spi.core.remoting.ConnectorFactory;
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.IDGenerator;
import org.hornetq.utils.SimpleIDGenerator;
@@ -1779,7 +1782,8 @@
: null,
executor,
channel,
- queueInfo);
+ queueInfo,
+ lookupTCCL());
addConsumer(consumer);
@@ -1849,7 +1853,19 @@
throw new HornetQException(HornetQException.OBJECT_CLOSED, "Session is closed");
}
}
+
+ private ClassLoader lookupTCCL()
+ {
+ return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+ {
+ public ClassLoader run()
+ {
+ return Thread.currentThread().getContextClassLoader();
+ }
+ });
+ }
+
private void doCleanup(boolean failingOver)
{
if (remotingConnection == null)
13 years, 1 month
JBoss hornetq SVN: r11627 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-01 19:56:50 -0400 (Tue, 01 Nov 2011)
New Revision: 11627
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
Log:
Fixing delete issue on LargeMessageTest (it was failing intermittently)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-01 21:09:46 UTC (rev 11626)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-01 23:56:50 UTC (rev 11627)
@@ -1275,11 +1275,11 @@
log.info("Deleting unreferenced message id=" + msg.getMessageID() + " from the journal");
try
{
- deleteMessage(msg.getMessageID());
+ deleteMessage(msg.getMessageID());
}
catch (Exception ignored)
{
- log.warn("It wasn't possible to delete message " + msg.getMessageID());
+ log.warn("It wasn't possible to delete message " + msg.getMessageID(), ignored);
}
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-11-01 21:09:46 UTC (rev 11626)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-11-01 23:56:50 UTC (rev 11627)
@@ -170,11 +170,21 @@
public synchronized void incrementDelayDeletionCount()
{
delayDeletionCount.incrementAndGet();
+ try
+ {
+ incrementRefCount();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
public synchronized void decrementDelayDeletionCount() throws Exception
{
int count = delayDeletionCount.decrementAndGet();
+
+ decrementRefCount();
if (count == 0)
{
13 years, 1 month
JBoss hornetq SVN: r11626 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2011-11-01 17:09:46 -0400 (Tue, 01 Nov 2011)
New Revision: 11626
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
Log:
Fixing rare dead lock on Flow control
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2011-11-01 19:49:48 UTC (rev 11625)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditManagerImpl.java 2011-11-01 21:09:46 UTC (rev 11626)
@@ -48,29 +48,44 @@
this.windowSize = windowSize;
}
- public synchronized ClientProducerCredits getCredits(final SimpleString address, final boolean anon)
+ public ClientProducerCredits getCredits(final SimpleString address, final boolean anon)
{
- ClientProducerCredits credits = producerCredits.get(address);
-
- if (credits == null)
+ boolean needInit = false;
+ ClientProducerCredits credits;
+
+ synchronized(this)
{
- // Doesn't need to be fair since session is single threaded
- credits = new ClientProducerCreditsImpl(session, address, windowSize);
-
- producerCredits.put(address, credits);
+ credits = producerCredits.get(address);
+
+ if (credits == null)
+ {
+ // Doesn't need to be fair since session is single threaded
+ credits = new ClientProducerCreditsImpl(session, address, windowSize);
+ needInit = true;
+
+ producerCredits.put(address, credits);
+ }
+
+ if (!anon)
+ {
+ credits.incrementRefCount();
+
+ // Remove from anon credits (if there)
+ unReferencedCredits.remove(address);
+ }
+ else
+ {
+ addToUnReferencedCache(address, credits);
+ }
}
-
- if (!anon)
+
+ // The init is done outside of the lock
+ // otherwise packages may arrive with flow control
+ // while this is still sending requests causing a dead lock
+ if (needInit)
{
- credits.incrementRefCount();
-
- // Remove from anon credits (if there)
- unReferencedCredits.remove(address);
+ credits.init();
}
- else
- {
- addToUnReferencedCache(address, credits);
- }
return credits;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2011-11-01 19:49:48 UTC (rev 11625)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2011-11-01 21:09:46 UTC (rev 11626)
@@ -27,6 +27,8 @@
void receiveCredits(int credits);
boolean isBlocked();
+
+ void init();
void reset();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-11-01 19:49:48 UTC (rev 11625)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-11-01 21:09:46 UTC (rev 11626)
@@ -58,7 +58,10 @@
// Doesn't need to be fair since session is single threaded
semaphore = new Semaphore(0, false);
-
+ }
+
+ public void init()
+ {
// We initial request twice as many credits as we request in subsequent requests
// This allows the producer to keep sending as more arrive, minimising pauses
checkCredits(windowSize);
13 years, 1 month
JBoss hornetq SVN: r11625 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/config and 8 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-01 15:49:48 -0400 (Tue, 01 Nov 2011)
New Revision: 11625
Modified:
branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/tests/config/ConfigurationTest-full-config.xml
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7455 - adding min-large-message-size to bridges and cluster-connections
Modified: branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd 2011-11-01 19:49:48 UTC (rev 11625)
@@ -313,6 +313,8 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="transformer-class-name" type="xsd:string">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="min-large-message-size" type="xsd:int">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="check-period" type="xsd:long">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="connection-ttl" type="xsd:long">
@@ -364,6 +366,8 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="connection-ttl" type="xsd:long">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="min-large-message-size" type="xsd:int">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="call-timeout" type="xsd:long">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="retry-interval" type="xsd:long">
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/BridgeConfiguration.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -64,6 +64,8 @@
private final long connectionTTL;
private final long maxRetryInterval;
+
+ private final int minLargeMessageSize;
public BridgeConfiguration(final String name,
@@ -71,6 +73,7 @@
final String forwardingAddress,
final String filterString,
final String transformerClassName,
+ final int minLargeMessageSize,
final long clientFailureCheckPeriod,
final long connectionTTL,
final long retryInterval,
@@ -87,6 +90,7 @@
this.name = name;
this.queueName = queueName;
this.forwardingAddress = forwardingAddress;
+ this.minLargeMessageSize = minLargeMessageSize;
this.filterString = filterString;
this.transformerClassName = transformerClassName;
this.retryInterval = retryInterval;
@@ -108,6 +112,7 @@
final String forwardingAddress,
final String filterString,
final String transformerClassName,
+ final int minLargeMessageSize,
final long clientFailureCheckPeriod,
final long connectionTTL,
final long retryInterval,
@@ -126,6 +131,7 @@
this.forwardingAddress = forwardingAddress;
this.filterString = filterString;
this.transformerClassName = transformerClassName;
+ this.minLargeMessageSize = minLargeMessageSize;
this.retryInterval = retryInterval;
this.retryIntervalMultiplier = retryIntervalMultiplier;
this.reconnectAttempts = reconnectAttempts;
@@ -244,6 +250,14 @@
}
/**
+ * @return the minLargeMessageSize
+ */
+ public int getMinLargeMessageSize()
+ {
+ return minLargeMessageSize;
+ }
+
+ /**
* @param forwardingAddress the forwardingAddress to set
*/
public void setForwardingAddress(final String forwardingAddress)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -66,6 +66,8 @@
private final boolean allowDirectConnectionsOnly;
+ private final int minLargeMessageSize;
+
public ClusterConnectionConfiguration(final String name,
final String address,
final String connectorName,
@@ -80,6 +82,7 @@
this(name,
address,
connectorName,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
retryInterval,
@@ -99,6 +102,7 @@
public ClusterConnectionConfiguration(final String name,
final String address,
final String connectorName,
+ final int minLargeMessageSize,
final long clientFailureCheckPeriod,
final long connectionTTL,
final long retryInterval,
@@ -130,6 +134,7 @@
this.maxHops = maxHops;
this.confirmationWindowSize = confirmationWindowSize;
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
+ this.minLargeMessageSize = minLargeMessageSize;
}
@@ -146,6 +151,7 @@
this(name,
address,
connectorName,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
retryInterval,
@@ -164,6 +170,7 @@
public ClusterConnectionConfiguration(final String name,
final String address,
final String connectorName,
+ final int minLargeMessageSize,
final long clientFailureCheckPeriod,
final long connectionTTL,
final long retryInterval,
@@ -193,6 +200,7 @@
this.staticConnectors = null;
this.maxHops = maxHops;
this.confirmationWindowSize = confirmationWindowSize;
+ this.minLargeMessageSize = minLargeMessageSize;
allowDirectConnectionsOnly = false;
}
@@ -295,4 +303,15 @@
{
return allowDirectConnectionsOnly;
}
+
+
+ /**
+ * @return the minLargeMessageSize
+ */
+ public int getMinLargeMessageSize()
+ {
+ return minLargeMessageSize;
+ }
+
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -1021,6 +1021,8 @@
double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(e, "retry-interval-multiplier",
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER, Validators.GT_ZERO);
+ int minLargeMessageSize = XMLConfigurationUtil.getInteger(e, "min-large-message-size", HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, Validators.GT_ZERO);
+
long maxRetryInterval = XMLConfigurationUtil.getLong(e, "max-retry-interval", ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL, Validators.GT_ZERO);
int reconnectAttempts = XMLConfigurationUtil.getInteger(e, "reconnect-attempts", ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS, Validators.MINUS_ONE_OR_GE_ZERO);
@@ -1065,6 +1067,7 @@
config = new ClusterConnectionConfiguration(name,
address,
connectorName,
+ minLargeMessageSize,
clientFailureCheckPeriod,
connectionTTL,
retryInterval,
@@ -1084,6 +1087,7 @@
config = new ClusterConnectionConfiguration(name,
address,
connectorName,
+ minLargeMessageSize,
clientFailureCheckPeriod,
connectionTTL,
retryInterval,
@@ -1147,6 +1151,11 @@
long connectionTTL = XMLConfigurationUtil.getLong(brNode, "connection-ttl",
HornetQClient.DEFAULT_CONNECTION_TTL, Validators.GT_ZERO) ;
+ int minLargeMessageSize = XMLConfigurationUtil.getInteger(brNode,
+ "min-large-message-size",
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ Validators.GT_ZERO);
+
long maxRetryInterval = XMLConfigurationUtil.getLong(brNode, "max-retry-interval", HornetQClient.DEFAULT_MAX_RETRY_INTERVAL, Validators.GT_ZERO);
@@ -1211,6 +1220,7 @@
forwardingAddress,
filterString,
transformerClassName,
+ minLargeMessageSize,
clientFailureCheckPeriod,
connectionTTL,
retryInterval,
@@ -1231,6 +1241,7 @@
forwardingAddress,
filterString,
transformerClassName,
+ minLargeMessageSize,
clientFailureCheckPeriod,
connectionTTL,
retryInterval,
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -1709,6 +1709,7 @@
forwardingAddress,
filterString,
transformerClassName,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
clientFailureCheckPeriod,
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
@@ -1730,6 +1731,7 @@
forwardingAddress,
filterString,
transformerClassName,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
clientFailureCheckPeriod,
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -142,7 +142,10 @@
private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
private final ClusterManagerInternal manager;
+
+ private final int minLargeMessageSize;
+
// Stuff that used to be on the ClusterManager
private final Topology topology = new Topology(this);
@@ -156,6 +159,7 @@
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
+ final int minLargeMessageSize,
final long clientFailureCheckPeriod,
final long connectionTTL,
final long retryInterval,
@@ -237,6 +241,8 @@
this.manager = manager;
this.callTimeout = callTimeout;
+
+ this.minLargeMessageSize = minLargeMessageSize;
clusterConnector = new StaticClusterConnector(tcConfigs);
@@ -265,6 +271,7 @@
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
+ final int minLargeMessageSize,
final long clientFailureCheckPeriod,
final long connectionTTL,
final long retryInterval,
@@ -310,6 +317,8 @@
this.retryIntervalMultiplier = retryIntervalMultiplier;
this.maxRetryInterval = maxRetryInterval;
+
+ this.minLargeMessageSize = minLargeMessageSize;
this.reconnectAttempts = reconnectAttempts;
@@ -902,6 +911,7 @@
targetLocator.setRetryInterval(retryInterval);
targetLocator.setMaxRetryInterval(maxRetryInterval);
targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
+ targetLocator.setMinLargeMessageSize(minLargeMessageSize);
targetLocator.setAfterConnectionInternalListener(this);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -470,6 +470,7 @@
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
+ serverLocator.setMinLargeMessageSize(config.getMinLargeMessageSize());
if (!config.isUseDuplicateDetection())
{
log.debug("Bridge " + config.getName() +
@@ -622,6 +623,7 @@
connector,
new SimpleString(config.getName()),
new SimpleString(config.getAddress()),
+ config.getMinLargeMessageSize(),
config.getClientFailureCheckPeriod(),
config.getConnectionTTL(),
config.getRetryInterval(),
@@ -659,6 +661,7 @@
connector,
new SimpleString(config.getName()),
new SimpleString(config.getAddress()),
+ config.getMinLargeMessageSize(),
config.getClientFailureCheckPeriod(),
config.getConnectionTTL(),
config.getRetryInterval(),
Modified: branches/Branch_2_2_EAP/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- branches/Branch_2_2_EAP/tests/config/ConfigurationTest-full-config.xml 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/tests/config/ConfigurationTest-full-config.xml 2011-11-01 19:49:48 UTC (rev 11625)
@@ -147,6 +147,7 @@
<forwarding-address>bridge-forwarding-address1</forwarding-address>
<filter string="sku > 1"/>
<transformer-class-name>org.foo.BridgeTransformer</transformer-class-name>
+ <min-large-message-size>4</min-large-message-size>
<retry-interval>3</retry-interval>
<retry-interval-multiplier>0.2</retry-interval-multiplier>
<reconnect-attempts>2</reconnect-attempts>
@@ -166,6 +167,7 @@
<cluster-connection name="cluster-connection1">
<address>queues1</address>
<connector-ref>connector1</connector-ref>
+ <min-large-message-size>321</min-large-message-size>
<call-timeout>123</call-timeout>
<retry-interval>3</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -124,6 +124,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
@@ -268,6 +269,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
@@ -396,6 +398,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
@@ -540,6 +543,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
clientFailureCheckPeriod,
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
@@ -679,6 +683,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -109,6 +109,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
@@ -279,6 +280,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
500,
@@ -496,6 +498,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
@@ -652,6 +655,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -141,6 +141,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
@@ -349,6 +350,7 @@
forwardAddress,
filterString,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
@@ -538,6 +540,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
100,
@@ -698,6 +701,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
100,
@@ -891,6 +895,7 @@
forwardAddress,
null,
SimpleTransformer.class.getName(),
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
@@ -1044,6 +1049,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
@@ -1290,6 +1296,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
@@ -1441,6 +1448,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
@@ -1601,6 +1609,7 @@
// address
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -116,6 +116,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -113,6 +113,7 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -1921,6 +1921,7 @@
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
address,
connectorFrom.getName(),
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
retryInterval,
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -166,6 +166,7 @@
targetQueueConfig.getAddress(),
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
RandomUtil.randomPositiveLong(),
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -144,6 +144,7 @@
targetQueueConfig.getAddress(),
null,
null,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
RandomUtil.randomPositiveLong(),
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2011-10-31 22:43:35 UTC (rev 11624)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2011-11-01 19:49:48 UTC (rev 11625)
@@ -199,6 +199,7 @@
{
Assert.assertEquals("bridge1", bc.getName());
Assert.assertEquals("queue1", bc.getQueueName());
+ Assert.assertEquals(4, bc.getMinLargeMessageSize());
Assert.assertEquals("bridge-forwarding-address1", bc.getForwardingAddress());
Assert.assertEquals("sku > 1", bc.getFilterString());
Assert.assertEquals("org.foo.BridgeTransformer", bc.getTransformerClassName());
@@ -227,6 +228,7 @@
if (ccc.getName().equals("cluster-connection1"))
{
Assert.assertEquals("cluster-connection1", ccc.getName());
+ Assert.assertEquals(321, ccc.getMinLargeMessageSize());
Assert.assertEquals("queues1", ccc.getAddress());
Assert.assertEquals(3, ccc.getRetryInterval());
Assert.assertEquals(true, ccc.isDuplicateDetection());
13 years, 1 month