JBoss hornetq SVN: r11410 - branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-25 03:39:38 -0400 (Sun, 25 Sep 2011)
New Revision: 11410
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
Log:
remove extra log
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-24 20:49:52 UTC (rev 11409)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-25 07:39:38 UTC (rev 11410)
@@ -103,12 +103,10 @@
@Override
public StompFrame onSend(StompFrame frame)
{
- log.error("-------------on Send: " + frame);
StompFrame response = null;
try
{
connection.validate();
- log.error("-----------connection is valid");
String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
@@ -120,13 +118,11 @@
StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
{
- log.error("--------------------------------it's a bryte type");
message.setType(Message.BYTES_TYPE);
message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
}
else
{
- log.error("------------------ it's a text type");
message.setType(Message.TEXT_TYPE);
String text = frame.getBody();
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
@@ -336,13 +332,11 @@
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
{
- log.error("------------------- server message is byte");
frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
buffer.readBytes(data);
}
else
{
- log.error("------------------- server message is text");
SimpleString text = buffer.readNullableSimpleString();
if (text != null)
{
@@ -377,7 +371,6 @@
@Override
public void replySent(StompFrame reply)
{
- log.error("-----------------------need destroy? " + reply.needsDisconnect());
if (reply.needsDisconnect())
{
connection.destroy();
13 years, 3 months
JBoss hornetq SVN: r11409 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-24 16:49:52 -0400 (Sat, 24 Sep 2011)
New Revision: 11409
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
fixing test
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-24 16:16:41 UTC (rev 11408)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-24 20:49:52 UTC (rev 11409)
@@ -65,11 +65,13 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.DuplicateIDCache;
@@ -137,11 +139,13 @@
// ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(HornetQServerImpl.class);
-
- // JMS Topics (which are outside of the scope of the core API) will require a dumb subscription with a dummy-filter at this current version
+
+ // JMS Topics (which are outside of the scope of the core API) will require a dumb subscription with a dummy-filter
+ // at this current version
// as a way to keep its existence valid and TCK tests
// That subscription needs an invalid filter, however paging needs to ignore any subscription with this filter.
- // For that reason, this filter needs to be rejected on paging or any other component on the system, and just be ignored for any purpose
+ // For that reason, this filter needs to be rejected on paging or any other component on the system, and just be
+ // ignored for any purpose
// It's declared here as this filter is considered a global ignore
public static final String GENERIC_IGNORED_FILTER = "__HQX=-1";
@@ -151,7 +155,6 @@
// Attributes
// -----------------------------------------------------------------------------------
-
private final Version version;
private final HornetQSecurityManager securityManager;
@@ -161,7 +164,7 @@
private final MBeanServer mbeanServer;
private volatile boolean started;
-
+
private volatile boolean stopped;
private volatile SecurityStore securityStore;
@@ -171,7 +174,7 @@
private volatile QueueFactory queueFactory;
private volatile PagingManager pagingManager;
-
+
private volatile PostOffice postOffice;
private volatile ExecutorService threadPool;
@@ -193,7 +196,7 @@
private volatile RemotingService remotingService;
private volatile ManagementService managementService;
-
+
private volatile ConnectorsService connectorsService;
private MemoryManager memoryManager;
@@ -214,7 +217,7 @@
private boolean initialised;
- // private FailoverManager replicationFailoverManager;
+ // private FailoverManager replicationFailoverManager;
private ReplicationManager replicationManager;
@@ -223,17 +226,16 @@
private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
private volatile GroupingHandler groupingHandler;
-
+
private NodeManager nodeManager;
-
+
// Used to identify the server on tests... useful on debugging testcases
private String identity;
-
+
private Thread backupActivationThread;
private Activation activation;
-
// Constructors
// ---------------------------------------------------------------------------------
@@ -312,74 +314,89 @@
public synchronized void start() throws Exception
{
- stopped = false;
-
- initialiseLogging();
+ OperationContextImpl.clearContext();
- checkJournalDirectory();
+ try
+ {
+ stopped = false;
- nodeManager = createNodeManager(configuration.getJournalDirectory());
+ initialiseLogging();
- nodeManager.start();
+ checkJournalDirectory();
- if (started)
- {
- HornetQServerImpl.log.info((configuration.isBackup() ? "backup" : "live") + " is already started, ignoring the call to start..");
- return;
- }
+ nodeManager = createNodeManager(configuration.getJournalDirectory());
- HornetQServerImpl.log.info((configuration.isBackup() ? "backup" : "live") + " server is starting with configuration " + configuration);
+ nodeManager.start();
- if (configuration.isRunSyncSpeedTest())
- {
- SyncSpeedTest test = new SyncSpeedTest();
-
- test.run();
- }
-
- if (!configuration.isBackup())
- {
- if (configuration.isSharedStore() && configuration.isPersistenceEnabled())
+ if (started)
{
- activation = new SharedStoreLiveActivation();
+ HornetQServerImpl.log.info((configuration.isBackup() ? "backup" : "live") + " is already started, ignoring the call to start..");
+ return;
+ }
- // This should block until the lock is got
+ HornetQServerImpl.log.info((configuration.isBackup() ? "backup" : "live") + " server is starting with configuration " +
+ configuration);
- activation.run();
- }
- else
+ if (configuration.isRunSyncSpeedTest())
{
- activation = new NoSharedStoreLiveActivation();
+ SyncSpeedTest test = new SyncSpeedTest();
- activation.run();
+ test.run();
}
- started = true;
- HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "]" + (this.identity != null ? " (" + identity : ")") + " started");
- }
+ if (!configuration.isBackup())
+ {
+ if (configuration.isSharedStore() && configuration.isPersistenceEnabled())
+ {
+ activation = new SharedStoreLiveActivation();
+ // This should block until the lock is got
- // The activation on fail-back may change the value of isBackup, for that reason we are not using else here
- if (configuration.isBackup())
- {
- if (configuration.isSharedStore())
- {
- activation = new SharedStoreBackupActivation();
+ activation.run();
+ }
+ else
+ {
+ activation = new NoSharedStoreLiveActivation();
+
+ activation.run();
+ }
+ started = true;
+
+ HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() +
+ " [" +
+ nodeManager.getNodeId() +
+ "]" +
+ (this.identity != null ? " (" + identity : ")") +
+ " started");
}
- else
+
+ // The activation on fail-back may change the value of isBackup, for that reason we are not using else here
+ if (configuration.isBackup())
{
- // Replicated
+ if (configuration.isSharedStore())
+ {
+ activation = new SharedStoreBackupActivation();
+ }
+ else
+ {
+ // Replicated
- activation = new SharedNothingBackupActivation();
+ activation = new SharedNothingBackupActivation();
+ }
+
+ backupActivationThread = new Thread(activation, "Activation for server " + this);
+ backupActivationThread.start();
}
- backupActivationThread = new Thread(activation, "Activation for server " + this);
- backupActivationThread.start();
+ // start connector service
+ connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool, postOffice);
+ connectorsService.start();
}
-
- // start connector service
- connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool, postOffice);
- connectorsService.start();
+ finally
+ {
+ // this avoids embedded applications using dirty contexts from startup
+ OperationContextImpl.clearContext();
+ }
}
@Override
@@ -400,7 +417,7 @@
stopped = true;
stop(configuration.isFailoverOnServerShutdown());
}
-
+
public void threadDump(final String reason)
{
StringWriter str = new StringWriter();
@@ -445,20 +462,21 @@
}
connectorsService.stop();
- //we stop the groupinghandler before we stop te cluster manager so binding mappings aren't removed in case of failover
+ // we stop the groupinghandler before we stop te cluster manager so binding mappings aren't removed in case of
+ // failover
if (groupingHandler != null)
{
managementService.removeNotificationListener(groupingHandler);
groupingHandler = null;
}
-
+
if (clusterManager != null)
{
clusterManager.stop();
}
}
-
+
// We close all the exception in an attempt to let any pending IO to finish
// to avoid scenarios where the send or ACK got to disk but the response didn't get to the client
// It may still be possible to have this scenario on a real failure (without the use of XA)
@@ -545,9 +563,9 @@
{
memoryManager.stop();
}
-
+
threadPool.shutdown();
-
+
scheduledPool.shutdown();
try
@@ -563,7 +581,6 @@
}
threadPool = null;
-
try
{
if (!scheduledPool.awaitTermination(10, TimeUnit.SECONDS))
@@ -577,7 +594,7 @@
}
threadPool = null;
-
+
scheduledPool = null;
pagingManager = null;
@@ -610,17 +627,20 @@
nodeManager.stop();
nodeManager = null;
-
+
addressSettingsRepository.clearListeners();
-
+
addressSettingsRepository.clearCache();
- HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + tempNodeID + "] stopped");
+ HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() +
+ " [" +
+ tempNodeID +
+ "] stopped");
Logger.reset();
}
- }
+ }
// HornetQServer implementation
// -----------------------------------------------------------
@@ -629,33 +649,33 @@
{
StringWriter str = new StringWriter();
PrintWriter out = new PrintWriter(str);
-
+
out.println("Information about server " + this.identity);
out.println("Cluster Connection:" + this.getClusterManager().describe());
-
+
return str.toString();
}
-
+
public void setIdentity(String identity)
{
this.identity = identity;
}
-
+
public String getIdentity()
{
return identity;
}
-
+
public ScheduledExecutorService getScheduledPool()
{
return scheduledPool;
}
-
+
public ExecutorService getThreadPool()
{
return threadPool;
}
-
+
public Configuration getConfiguration()
{
return configuration;
@@ -665,7 +685,7 @@
{
return mbeanServer;
}
-
+
public PagingManager getPagingManager()
{
return pagingManager;
@@ -695,7 +715,7 @@
{
return securityRepository;
}
-
+
public NodeManager getNodeManager()
{
return nodeManager;
@@ -802,7 +822,7 @@
{
// getSessions is called here in a try to minimize locking the Server while this check is being done
Set<ServerSession> allSessions = getSessions();
-
+
for (ServerSession session : allSessions)
{
String metaValue = session.getMetaData(key);
@@ -811,10 +831,10 @@
return true;
}
}
-
+
return false;
}
-
+
public synchronized List<ServerSession> getSessions(final String connectionID)
{
Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();
@@ -865,7 +885,7 @@
public SimpleString getNodeID()
{
- return nodeManager == null?null:nodeManager.getNodeId();
+ return nodeManager == null ? null : nodeManager.getNodeId();
}
public Queue createQueue(final SimpleString address,
@@ -876,24 +896,24 @@
{
return createQueue(address, queueName, filterString, durable, temporary, false);
}
-
+
public Queue locateQueue(SimpleString queueName) throws Exception
{
Binding binding = postOffice.getBinding(queueName);
-
+
if (binding == null)
{
return null;
}
-
+
Bindable queue = binding.getBindable();
-
+
if (!(queue instanceof Queue))
{
throw new IllegalStateException("locateQueue should only be used to locate queues");
}
-
- return (Queue) binding.getBindable();
+
+ return (Queue)binding.getBindable();
}
public Queue deployQueue(final SimpleString address,
@@ -919,7 +939,7 @@
}
Queue queue = (Queue)binding.getBindable();
-
+
if (queue.getPageSubscription() != null)
{
queue.getPageSubscription().close();
@@ -997,7 +1017,6 @@
return connectorsService;
}
-
public synchronized boolean checkActivate() throws Exception
{
if (configuration.isBackup())
@@ -1081,7 +1100,7 @@
managementService.registerDivert(divert, config);
}
-
+
public void destroyDivert(SimpleString name) throws Exception
{
Binding binding = postOffice.getBinding(name);
@@ -1097,8 +1116,6 @@
postOffice.removeBinding(name);
}
-
-
public void deployBridge(BridgeConfiguration config) throws Exception
{
if (clusterManager != null)
@@ -1106,7 +1123,7 @@
clusterManager.deployBridge(config, true);
}
}
-
+
public void destroyBridge(String name) throws Exception
{
if (clusterManager != null)
@@ -1114,14 +1131,14 @@
clusterManager.destroyBridge(name);
}
}
-
+
public ServerSession getSessionByID(String sessionName)
{
return sessions.get(sessionName);
}
-
+
// PUBLIC -------
-
+
public String toString()
{
if (identity != null)
@@ -1134,8 +1151,6 @@
}
}
-
-
// Package protected
// ----------------------------------------------------------------------------
@@ -1146,30 +1161,30 @@
* Protected so tests can change this behaviour
* @param backupConnector
*/
-// protected FailoverManagerImpl createBackupConnectionFailoverManager(final TransportConfiguration backupConnector,
-// final ExecutorService threadPool,
-// final ScheduledExecutorService scheduledPool)
-// {
-// return new FailoverManagerImpl((ClientSessionFactory)null,
-// backupConnector,
-// null,
-// false,
-// HornetQClient.DEFAULT_CALL_TIMEOUT,
-// HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
-// HornetQClient.DEFAULT_CONNECTION_TTL,
-// 0,
-// 1.0d,
-// 0,
-// 1,
-// false,
-// threadPool,
-// scheduledPool,
-// null);
-// }
+ // protected FailoverManagerImpl createBackupConnectionFailoverManager(final TransportConfiguration backupConnector,
+ // final ExecutorService threadPool,
+ // final ScheduledExecutorService scheduledPool)
+ // {
+ // return new FailoverManagerImpl((ClientSessionFactory)null,
+ // backupConnector,
+ // null,
+ // false,
+ // HornetQClient.DEFAULT_CALL_TIMEOUT,
+ // HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ // HornetQClient.DEFAULT_CONNECTION_TTL,
+ // 0,
+ // 1.0d,
+ // 0,
+ // 1,
+ // false,
+ // threadPool,
+ // scheduledPool,
+ // null);
+ // }
protected PagingManager createPagingManager()
{
-
+
return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
(long)configuration.getJournalBufferSize_NIO(),
scheduledPool,
@@ -1313,11 +1328,9 @@
nodeManager.getUUID(),
configuration.isBackup(),
configuration.isClustered());
-
clusterManager.deploy();
-
remotingService = new RemotingServiceImpl(clusterManager, configuration, this, managementService, scheduledPool);
messagingServerControl = managementService.registerServer(postOffice,
@@ -1341,7 +1354,7 @@
addressSettingsDeployer.start();
}
-
+
deployAddressSettingsFromConfiguration();
storageManager.start();
@@ -1385,15 +1398,14 @@
private void initialisePart2() throws Exception
{
// Load the journal and populate queues, transactions and caches in memory
-
-
+
if (stopped)
{
return;
}
-
+
pagingManager.reloadStores();
-
+
JournalLoadInformation[] journalInfo = loadJournals();
compareJournals(journalInfo);
@@ -1507,13 +1519,17 @@
for (QueueBindingInfo queueBindingInfo : queueBindingInfos)
{
queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
-
- if (queueBindingInfo.getFilterString() == null || !queueBindingInfo.getFilterString().toString().equals(GENERIC_IGNORED_FILTER))
+
+ if (queueBindingInfo.getFilterString() == null || !queueBindingInfo.getFilterString()
+ .toString()
+ .equals(GENERIC_IGNORED_FILTER))
{
Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
-
- PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(), filter, true);
-
+
+ PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress())
+ .getCursorProvier()
+ .createSubscription(queueBindingInfo.getId(), filter, true);
+
Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
queueBindingInfo.getAddress(),
queueBindingInfo.getQueueName(),
@@ -1521,18 +1537,17 @@
subscription,
true,
false);
-
+
Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
-
+
queues.put(queueBindingInfo.getId(), queue);
-
+
postOffice.addBinding(binding);
-
+
managementService.registerAddress(queueBindingInfo.getAddress());
managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
}
-
-
+
}
for (GroupingInfo groupingInfo : groupingInfos)
@@ -1546,7 +1561,7 @@
}
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
-
+
HashSet<Pair<Long, Long>> pendingLargeMessages = new HashSet<Pair<Long, Long>>();
journalInfo[1] = storageManager.loadMessageJournal(postOffice,
@@ -1568,7 +1583,7 @@
cache.load(entry.getValue());
}
}
-
+
for (Pair<Long, Long> msgToDelete : pendingLargeMessages)
{
log.info("Deleting pending large message as it wasn't completed:" + msgToDelete);
@@ -1631,19 +1646,20 @@
}
Filter filter = FilterImpl.createFilter(filterString);
-
+
long queueID = storageManager.generateUniqueID();
PageSubscription pageSubscription;
-
-
+
if (filterString != null && filterString.toString().equals(GENERIC_IGNORED_FILTER))
{
pageSubscription = null;
}
else
{
- pageSubscription = pagingManager.getPageStore(address).getCursorProvier().createSubscription(queueID, filter, durable);
+ pageSubscription = pagingManager.getPageStore(address)
+ .getCursorProvier()
+ .createSubscription(queueID, filter, durable);
}
final Queue queue = queueFactory.createQueue(queueID,
@@ -1759,32 +1775,31 @@
}
else
{
- throw new IllegalArgumentException("Directory " + journalDir +
- " does not exist and will not be created");
+ throw new IllegalArgumentException("Directory " + journalDir + " does not exist and will not be created");
}
}
}
-
+
/**
* To be called by backup trying to fail back the server
*/
private void startFailbackChecker()
{
- scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l, TimeUnit.MILLISECONDS);
+ scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l, TimeUnit.MILLISECONDS);
}
-
// Inner classes
// --------------------------------------------------------------------------------
-
+
class FailbackChecker implements Runnable
{
boolean restarting = false;
+
public void run()
{
try
{
- if(!restarting && nodeManager.isAwaitingFailback())
+ if (!restarting && nodeManager.isAwaitingFailback())
{
log.info("live server wants to restart, restarting server in backup");
restarting = true;
@@ -1819,8 +1834,6 @@
}
}
-
-
private class SharedStoreLiveActivation implements Activation
{
public void run()
@@ -1830,7 +1843,7 @@
log.info("Waiting to obtain live lock");
checkJournalDirectory();
-
+
if (log.isDebugEnabled())
{
log.debug("First part initialization on " + this);
@@ -1838,9 +1851,10 @@
initialisePart1();
- if(nodeManager.isBackupLive())
+ if (nodeManager.isBackupLive())
{
- //looks like we've failed over at some point need to inform that we are the backup so when the current live
+ // looks like we've failed over at some point need to inform that we are the backup so when the current
+ // live
// goes down they failover to us
if (log.isDebugEnabled())
{
@@ -1857,9 +1871,9 @@
{
return;
}
-
+
initialisePart2();
-
+
log.info("Server is now live");
}
catch (Exception e)
@@ -1870,7 +1884,7 @@
public void close(boolean permanently) throws Exception
{
- if(permanently)
+ if (permanently)
{
nodeManager.crashLiveServer();
}
@@ -1881,7 +1895,6 @@
}
}
-
private class SharedStoreBackupActivation implements Activation
{
public void run()
@@ -1891,50 +1904,53 @@
nodeManager.startBackup();
initialisePart1();
-
+
clusterManager.start();
started = true;
- log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started, waiting live to fail before it gets active");
+ log.info("HornetQ Backup Server version " + getVersion().getFullVersion() +
+ " [" +
+ nodeManager.getNodeId() +
+ "] started, waiting live to fail before it gets active");
nodeManager.awaitLiveNode();
-
+
configuration.setBackup(false);
-
+
if (stopped)
{
return;
}
-
+
initialisePart2();
-
+
clusterManager.activate();
log.info("Backup Server is now live");
nodeManager.releaseBackup();
- if(configuration.isAllowAutoFailBack())
+ if (configuration.isAllowAutoFailBack())
{
startFailbackChecker();
}
}
catch (InterruptedException e)
{
- //this is ok, we are being stopped
+ // this is ok, we are being stopped
}
catch (ClosedChannelException e)
{
- //this is ok too, we are being stopped
+ // this is ok too, we are being stopped
}
catch (Exception e)
{
- if(!(e.getCause() instanceof InterruptedException))
+ if (!(e.getCause() instanceof InterruptedException))
{
log.error("Failure in initialisation", e);
}
}
- catch(Throwable e)
+ catch (Throwable e)
{
log.error("Failure in initialisation", e);
}
@@ -1956,7 +1972,7 @@
nodeManager.interrupt();
backupActivationThread.interrupt();
-
+
backupActivationThread.join(1000);
}
@@ -1970,10 +1986,10 @@
}
else
{
- //if we are now live, behave as live
+ // if we are now live, behave as live
// We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
// started before the live
- if(permanently)
+ if (permanently)
{
nodeManager.crashLiveServer();
}
@@ -1984,7 +2000,7 @@
}
}
}
-
+
private interface Activation extends Runnable
{
void close(boolean permanently) throws Exception;
@@ -2044,5 +2060,4 @@
}
}
-
}
13 years, 3 months
JBoss hornetq SVN: r11408 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-24 12:16:41 -0400 (Sat, 24 Sep 2011)
New Revision: 11408
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
fixing a test
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-24 16:15:52 UTC (rev 11407)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-24 16:16:41 UTC (rev 11408)
@@ -3012,7 +3012,7 @@
{
final RefEncoding encoding = new RefEncoding();
encoding.decode(buffer);
- return encoding;
+ return new ReferenceDescribe(encoding);
}
case ACKNOWLEDGE_REF:
13 years, 3 months
JBoss hornetq SVN: r11407 - branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-24 12:15:52 -0400 (Sat, 24 Sep 2011)
New Revision: 11407
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java 2011-09-24 14:24:32 UTC (rev 11406)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java 2011-09-24 16:15:52 UTC (rev 11407)
@@ -17,11 +17,6 @@
*/
package org.hornetq.tests.integration.stomp.v11;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
@@ -35,8 +30,6 @@
import javax.jms.TextMessage;
import javax.jms.Topic;
-import junit.framework.Assert;
-
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-24 14:24:32 UTC (rev 11406)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-24 16:15:52 UTC (rev 11407)
@@ -38,7 +38,9 @@
import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
import org.hornetq.tests.integration.stomp.util.StompClientConnectionV11;
-
+/*
+ *
+ */
public class StompTestV11 extends StompTestBase2
{
private static final transient Logger log = Logger.getLogger(StompTestV11.class);
13 years, 3 months
JBoss hornetq SVN: r11406 - in branches/STOMP11: hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11 and 3 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-24 10:24:32 -0400 (Sat, 24 Sep 2011)
New Revision: 11406
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -601,6 +601,7 @@
selector += " AND " + noLocalFilter;
}
}
+
if (ack == null)
{
ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -119,9 +119,11 @@
Set<SimpleString> names = message.getPropertyNames();
for (SimpleString name : names)
{
+ String value = name.toString();
if (name.equals(ClientMessageImpl.REPLYTO_HEADER_NAME) ||
- name.toString().equals("JMSType") ||
- name.toString().equals("JMSCorrelationID"))
+ value.equals("JMSType") ||
+ value.equals("JMSCorrelationID") ||
+ value.equals(Stomp.Headers.Message.DESTINATION))
{
continue;
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -432,7 +432,7 @@
byte[] data = new byte[size];
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
{
- frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length > 0 ? (data.length - 1) : data.length));
+ frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
buffer.readBytes(data);
}
else
@@ -1036,14 +1036,12 @@
}
else
{
- content = new byte[decoder.contentLength + 1];
+ content = new byte[decoder.contentLength];
System.arraycopy(decoder.workingBuffer, decoder.pos, content, 0, decoder.contentLength);
decoder.pos += decoder.contentLength + 1;
- content[decoder.contentLength] = 0;
-
//drain all the rest
if (decoder.bodyStart == -1)
{
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -192,7 +192,6 @@
public void testSendMessageWithReceipt() throws Exception
{
-
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
@@ -407,6 +406,9 @@
sendMessage(payload, queue);
frame = receiveFrame(10000);
+
+ System.out.println("Message: " + frame);
+
Assert.assertTrue(frame.startsWith("MESSAGE"));
Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -149,6 +149,11 @@
{
return frameQueue.poll(10, TimeUnit.SECONDS);
}
+
+ public ClientStompFrame receiveFrame(long timeout) throws InterruptedException
+ {
+ return frameQueue.poll(timeout, TimeUnit.MILLISECONDS);
+ }
//put bytes to byte array.
private void receiveBytes(int n) throws UnsupportedEncodingException
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -26,6 +26,8 @@
ClientStompFrame receiveFrame() throws InterruptedException;
+ ClientStompFrame receiveFrame(long timeout) throws InterruptedException;;
+
void connect() throws Exception;
void disconnect() throws IOException, InterruptedException;
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -23,15 +23,16 @@
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import junit.framework.Assert;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
import org.hornetq.tests.integration.stomp.util.StompClientConnection;
import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
@@ -936,7 +937,46 @@
Assert.assertNull(message);
}
+
+ public void testTwoSubscribers() throws Exception
+ {
+ connV11.connect(defUser, defPass, "myclientid");
+ this.subscribeTopic(connV11, "sub1", "auto", null);
+
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass, "myclientid2");
+
+ this.subscribeTopic(newConn, "sub2", "auto", null);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getTopicPrefix() + getTopicName());
+
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ // receive message from socket
+ frame = connV11.receiveFrame(1000);
+
+ System.out.println("received frame : " + frame);
+ assertEquals("Hello World", frame.getBody());
+ assertEquals("sub1", frame.getHeader("subscription"));
+
+ frame = newConn.receiveFrame(1000);
+
+ System.out.println("received 2 frame : " + frame);
+ assertEquals("Hello World", frame.getBody());
+ assertEquals("sub2", frame.getHeader("subscription"));
+
+ // remove suscription
+ this.unsubscribe(connV11, "sub1", true);
+ this.unsubscribe(newConn, "sub2", true);
+
+ connV11.disconnect();
+ newConn.disconnect();
+ }
+
//tests below are adapted from StompTest
public void testBeginSameTransactionTwice() throws Exception
{
@@ -1255,8 +1295,8 @@
BytesMessage message = (BytesMessage)consumer.receive(10000);
Assert.assertNotNull(message);
- //there is one extra null byte
- assertEquals(data.length + 1, message.getBodyLength());
+
+ assertEquals(data.length, message.getBodyLength());
assertEquals(data[0], message.readByte());
assertEquals(data[1], message.readByte());
assertEquals(data[2], message.readByte());
@@ -1307,10 +1347,394 @@
long tnow = System.currentTimeMillis();
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+
+ assertNull(consumer.receive(1000));
+
+ connV11.disconnect();
}
+ public void testSendMessageWithReceipt() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("receipt", "1234");
+ frame.setBody("Hello World");
+
+ frame = connV11.sendFrame(frame);
+
+ assertTrue(frame.getCommand().equals("RECEIPT"));
+ assertEquals("1234", frame.getHeader("receipt-id"));
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+
+ connV11.disconnect();
+ }
+
+ public void testSendMessageWithStandardHeaders() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("correlation-id", "c123");
+ frame.addHeader("persistent", "true");
+ frame.addHeader("priority", "3");
+ frame.addHeader("type", "t345");
+ frame.addHeader("JMSXGroupID", "abc");
+ frame.addHeader("foo", "abc");
+ frame.addHeader("bar", "123");
+
+ frame.setBody("Hello World");
+
+ frame = connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
+ Assert.assertEquals("getJMSType", "t345", message.getJMSType());
+ Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+ Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+ Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+
+ Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeToTopic() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribeTopic(connV11, "sub1", null, null, true);
+
+ sendMessage(getName(), topic);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
+ Assert.assertTrue(frame.getBody().equals(getName()));
+
+ this.unsubscribe(connV11, "sub1", true);
+
+ sendMessage(getName(), topic);
+
+ frame = connV11.receiveFrame(1000);
+ assertNull(frame);
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeToTopicWithNoLocal() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribeTopic(connV11, "sub1", null, null, true, true);
+
+ // send a message on the same connection => it should not be received
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getTopicPrefix() + getTopicName());
+
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ frame = connV11.receiveFrame(2000);
+
+ assertNull(frame);
+
+ // send message on another JMS connection => it should be received
+ sendMessage(getName(), topic);
+
+ frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
+ Assert.assertTrue(frame.getBody().equals(getName()));
+
+ this.unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithAutoAck() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertEquals("MESSAGE", frame.getCommand());
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertEquals(getName(), frame.getBody());
+
+ connV11.disconnect();
+
+ // message should not be received as it was auto-acked
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testSubscribeWithAutoAckAndBytesMessage() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ byte[] payload = new byte[] { 1, 2, 3, 4, 5 };
+ sendMessage(payload, queue);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ System.out.println("Message: " + frame);
+
+ assertEquals("5", frame.getHeader("content-length"));
+
+ assertEquals(null, frame.getHeader("type"));
+
+ assertEquals(frame.getBody(), new String(payload, "UTF-8"));
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithClientAck() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ this.ack(connV11, "sub1", frame);
+
+ connV11.disconnect();
+
+ // message should not be received since message was acknowledged by the client
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception
+ {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+ }
+
+ public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception
+ {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+ }
+
+ public void testSubscribeWithID() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "mysubid", "auto");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getHeader("subscription") != null);
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithMessageSentWithProperties() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ MessageProducer producer = session.createProducer(queue);
+ BytesMessage message = session.createBytesMessage();
+ message.setStringProperty("S", "value");
+ message.setBooleanProperty("n", false);
+ message.setByteProperty("byte", (byte)9);
+ message.setDoubleProperty("d", 2.0);
+ message.setFloatProperty("f", (float)6.0);
+ message.setIntProperty("i", 10);
+ message.setLongProperty("l", 121);
+ message.setShortProperty("s", (short)12);
+ message.writeBytes("Hello World".getBytes("UTF-8"));
+ producer.send(message);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+ Assert.assertNotNull(frame);
+
+ Assert.assertTrue(frame.getHeader("S") != null);
+ Assert.assertTrue(frame.getHeader("n") != null);
+ Assert.assertTrue(frame.getHeader("byte") != null);
+ Assert.assertTrue(frame.getHeader("d") != null);
+ Assert.assertTrue(frame.getHeader("f") != null);
+ Assert.assertTrue(frame.getHeader("i") != null);
+ Assert.assertTrue(frame.getHeader("l") != null);
+ Assert.assertTrue(frame.getHeader("s") != null);
+ Assert.assertEquals("Hello World", frame.getBody());
+
+ connV11.disconnect();
+ }
+
+ public void testSuccessiveTransactionsWithSameID() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ // first tx
+ this.beginTransaction(connV11, "tx1");
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ this.commitTransaction(connV11, "tx1");
+
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ // 2nd tx with same tx ID
+ this.beginTransaction(connV11, "tx1");
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ this.commitTransaction(connV11, "tx1");
+
+ message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ connV11.disconnect();
+ }
+
+ public void testTransactionCommit() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ this.beginTransaction(connV11, "tx1");
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+ frame.addHeader("receipt", "123");
+ frame.setBody("Hello World");
+
+ frame = connV11.sendFrame(frame);
+
+ assertEquals("123", frame.getHeader("receipt-id"));
+
+ // check the message is not committed
+ assertNull(consumer.receive(100));
+
+ this.commitTransaction(connV11, "tx1", true);
+
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ connV11.disconnect();
+ }
+
+ public void testTransactionRollback() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ this.beginTransaction(connV11, "tx1");
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+
+ frame.setBody("first message");
+
+ connV11.sendFrame(frame);
+
+ // rollback first message
+ this.abortTransaction(connV11, "tx1");
+
+ this.beginTransaction(connV11, "tx1");
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+
+ frame.setBody("second message");
+
+ connV11.sendFrame(frame);
+
+ this.commitTransaction(connV11, "tx1", true);
+
+ // only second msg should be received since first msg was rolled back
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("second message", message.getText());
+
+ connV11.disconnect();
+ }
+
+ public void testUnsubscribe() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ // send a message to our queue
+ sendMessage("first message");
+
+ // receive message from socket
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+
+ // remove suscription
+ this.unsubscribe(connV11, "sub1", true);
+
+ // send a message to our queue
+ sendMessage("second message");
+
+ frame = connV11.receiveFrame(1000);
+ assertNull(frame);
+
+ connV11.disconnect();
+ }
+
//-----------------private help methods
-
+
private void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
{
ClientStompFrame abortFrame = conn.createFrame("ABORT");
@@ -1326,7 +1750,27 @@
conn.sendFrame(beginFrame);
}
-
+
+ private void commitTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
+ {
+ commitTransaction(conn, txID, false);
+ }
+
+ private void commitTransaction(StompClientConnection conn, String txID, boolean receipt) throws IOException, InterruptedException
+ {
+ ClientStompFrame beginFrame = conn.createFrame("COMMIT");
+ beginFrame.addHeader("transaction", txID);
+ if (receipt)
+ {
+ beginFrame.addHeader("receipt", "1234");
+ }
+ ClientStompFrame resp = conn.sendFrame(beginFrame);
+ if (receipt)
+ {
+ assertEquals("1234", resp.getHeader("receipt-id"));
+ }
+ }
+
private void ack(StompClientConnection conn, String subId,
ClientStompFrame frame) throws IOException, InterruptedException
{
@@ -1376,14 +1820,30 @@
{
subscribe(conn, subId, ack, durableId, null);
}
+
+ private void subscribe(StompClientConnection conn, String subId,
+ String ack, String durableId, boolean receipt) throws IOException, InterruptedException
+ {
+ subscribe(conn, subId, ack, durableId, null, receipt);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId, String ack,
+ String durableId, String selector) throws IOException,
+ InterruptedException
+ {
+ subscribe(conn, subId, ack, durableId, selector, false);
+ }
private void subscribe(StompClientConnection conn, String subId,
- String ack, String durableId, String selector) throws IOException, InterruptedException
+ String ack, String durableId, String selector, boolean receipt) throws IOException, InterruptedException
{
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
subFrame.addHeader("id", subId);
subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", ack);
+ if (ack != null)
+ {
+ subFrame.addHeader("ack", ack);
+ }
if (durableId != null)
{
subFrame.addHeader("durable-subscriber-name", durableId);
@@ -1392,19 +1852,60 @@
{
subFrame.addHeader("selector", selector);
}
- conn.sendFrame(subFrame);
+ if (receipt)
+ {
+ subFrame.addHeader("receipt", "1234");
+ }
+
+ subFrame = conn.sendFrame(subFrame);
+
+ if (receipt)
+ {
+ assertEquals("1234", subFrame.getHeader("receipt-id"));
+ }
}
private void subscribeTopic(StompClientConnection conn, String subId,
String ack, String durableId) throws IOException, InterruptedException
{
+ subscribeTopic(conn, subId, ack, durableId, false);
+ }
+
+ private void subscribeTopic(StompClientConnection conn, String subId,
+ String ack, String durableId, boolean receipt) throws IOException, InterruptedException
+ {
+ subscribeTopic(conn, subId, ack, durableId, receipt, false);
+ }
+
+ private void subscribeTopic(StompClientConnection conn, String subId,
+ String ack, String durableId, boolean receipt, boolean noLocal) throws IOException, InterruptedException
+ {
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
subFrame.addHeader("id", subId);
subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
- subFrame.addHeader("ack", ack);
- subFrame.addHeader("durable-subscriber-name", durableId);
+ if (ack != null)
+ {
+ subFrame.addHeader("ack", ack);
+ }
+ if (durableId != null)
+ {
+ subFrame.addHeader("durable-subscriber-name", durableId);
+ }
+ if (receipt)
+ {
+ subFrame.addHeader("receipt", "1234");
+ }
+ if (noLocal)
+ {
+ subFrame.addHeader("no-local", "true");
+ }
- conn.sendFrame(subFrame);
+ ClientStompFrame frame = conn.sendFrame(subFrame);
+
+ if (receipt)
+ {
+ assertTrue(frame.getHeader("receipt-id").equals("1234"));
+ }
}
private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException
@@ -1414,7 +1915,77 @@
conn.sendFrame(subFrame);
}
+
+ private void unsubscribe(StompClientConnection conn, String subId,
+ boolean receipt) throws IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
+ subFrame.addHeader("id", subId);
+
+ if (receipt)
+ {
+ subFrame.addHeader("receipt", "4321");
+ }
+
+ ClientStompFrame f = conn.sendFrame(subFrame);
+
+ if (receipt)
+ {
+ System.out.println("response: " + f);
+ assertEquals("RECEIPT", f.getCommand());
+ assertEquals("4321", f.getHeader("receipt-id"));
+ }
+ }
+ protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertEquals("MESSAGE", frame.getCommand());
+
+ log.info("Reconnecting!");
+
+ if (sendDisconnect)
+ {
+ connV11.disconnect();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ }
+ else
+ {
+ connV11.destroy();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ }
+
+ // message should be received since message was not acknowledged
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", null);
+
+ frame = connV11.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+
+ connV11.disconnect();
+
+ // now lets make sure we don't see the message again
+ connV11.destroy();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", null, null, true);
+
+ sendMessage("shouldBeNextMessage");
+
+ frame = connV11.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertEquals("shouldBeNextMessage", frame.getBody());
+ }
+
}
13 years, 3 months
JBoss hornetq SVN: r11405 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/persistence/impl/journal and 10 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-24 02:22:26 -0400 (Sat, 24 Sep 2011)
New Revision: 11405
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
JBPAPP-7161 - fixed partial streamed large messages
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -15,6 +15,7 @@
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import javax.transaction.xa.Xid;
@@ -39,6 +40,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.transaction.Transaction;
/**
*
@@ -95,6 +97,12 @@
long getCurrentUniqueID();
+ // Confirms that a large message was finished
+ void confirmPendingLargeMessageTX(Transaction transaction, long messageID, long recordID) throws Exception;
+
+ // Confirms that a large message was finished
+ void confirmPendingLargeMessage(long recordID) throws Exception;
+
void storeMessage(ServerMessage message) throws Exception;
void storeReference(long queueID, long messageID, boolean last) throws Exception;
@@ -125,8 +133,6 @@
void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception;
- void deleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
-
void storeDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception;
void updateDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception;
@@ -141,8 +147,9 @@
* @param message This is a temporary message that holds the parsed properties.
* The remoting layer can't create a ServerMessage directly, then this will be replaced.
* @return
+ * @throws Exception
*/
- LargeServerMessage createLargeMessage(long id, MessageInternal message);
+ LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception;
void prepare(long txID, Xid xid) throws Exception;
@@ -169,7 +176,8 @@
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
Map<Long, QueueBindingInfo> queueInfos,
- final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception;
+ final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+ final Set<Pair<Long, Long>> pendingLargeMessages) throws Exception;
long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -53,7 +53,6 @@
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
-import org.hornetq.core.journal.impl.ExportJournal;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.JournalReaderCallback;
@@ -98,7 +97,6 @@
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUID;
import org.hornetq.utils.XidCodecSupport;
/**
@@ -130,6 +128,10 @@
public static final byte SECURITY_RECORD = 26;
// Message journal record types
+
+ // This is used when a large message is created but not yet stored on the system.
+ // We use this to avoid temporary files missing
+ public static final byte ADD_LARGE_MESSAGE_PENDING = 29;
public static final byte ADD_LARGE_MESSAGE = 30;
@@ -155,8 +157,6 @@
public static final byte PAGE_CURSOR_COUNTER_INC = 41;
- private UUID persistentID;
-
private final BatchingIDGenerator idGenerator;
private final ReplicationManager replicator;
@@ -453,7 +453,7 @@
}
}
- public LargeServerMessage createLargeMessage(final long id, final MessageInternal message)
+ public LargeServerMessage createLargeMessage(final long id, final MessageInternal message) throws Exception
{
if (isReplicated())
{
@@ -466,11 +466,45 @@
largeMessage.setMessageID(id);
+ if (largeMessage.isDurable())
+ {
+ // We store a marker on the journal that the large file is pending
+ long pendingRecordID = storePendingLargeMessage(id);
+
+ largeMessage.setPendingRecordID(pendingRecordID);
+ }
+
return largeMessage;
}
// Non transactional operations
-
+
+ public long storePendingLargeMessage(final long messageID) throws Exception
+ {
+ long recordID = generateUniqueID();
+
+ messageJournal.appendAddRecord(recordID,
+ ADD_LARGE_MESSAGE_PENDING,
+ new PendingLargeMessageEncoding(messageID),
+ true,
+ getContext(true));
+
+ return recordID;
+ }
+
+ public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long recordID) throws Exception
+ {
+ installLargeMessageConfirmationOnTX(tx, recordID);
+ messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID, new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
+ }
+
+
+ /** We don't need messageID now but we are likely to need it we ever decide to support a database */
+ public void confirmPendingLargeMessage(long recordID) throws Exception
+ {
+ messageJournal.appendDeleteRecord(recordID, true, getContext());
+ }
+
public void storeMessage(final ServerMessage message) throws Exception
{
if (message.getMessageID() <= 0)
@@ -690,11 +724,6 @@
encoding);
}
- public void deleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
- {
- messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
- }
-
public void prepare(final long txID, final Xid xid) throws Exception
{
messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional, getContext(syncTransactional));
@@ -830,7 +859,8 @@
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
Map<Long, QueueBindingInfo> queueInfos,
- final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+ final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+ final Set<Pair<Long, Long>> pendingLargeMessages) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -869,6 +899,19 @@
switch (recordType)
{
+ case ADD_LARGE_MESSAGE_PENDING:
+ {
+ PendingLargeMessageEncoding pending = new PendingLargeMessageEncoding();
+
+ pending.decode(buff);
+
+ if (pendingLargeMessages != null)
+ {
+ // it could be null on tests, and we don't need anything on that case
+ pendingLargeMessages.add(new Pair<Long, Long>(record.id, pending.largeMessageID));
+ }
+ break;
+ }
case ADD_LARGE_MESSAGE:
{
LargeServerMessage largeMessage = parseLargeMessage(messages, buff);
@@ -1188,7 +1231,8 @@
queueInfos,
preparedTransactions,
duplicateIDMap,
- pageSubscriptions);
+ pageSubscriptions,
+ pendingLargeMessages);
for (PageSubscription sub : pageSubscriptions.values())
{
@@ -1547,7 +1591,7 @@
// Package protected ---------------------------------------------
// This should be accessed from this package only
- void deleteFile(final SequentialFile file)
+ void deleteLargeMessage(final SequentialFile file)
{
Runnable deleteAction = new Runnable()
{
@@ -1656,7 +1700,8 @@
final Map<Long, QueueBindingInfo> queueInfos,
final List<PreparedTransactionInfo> preparedTransactions,
final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
- final Map<Long, PageSubscription> pageSubscriptions) throws Exception
+ final Map<Long, PageSubscription> pageSubscriptions,
+ final Set<Pair<Long, Long>> pendingLargeMessages) throws Exception
{
// recover prepared transactions
for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
@@ -1746,7 +1791,6 @@
throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
}
- // TODO - this involves a scan - we should find a quicker way of doing it
MessageReference removed = queue.removeReferenceWithID(messageID);
if (removed == null)
@@ -1867,32 +1911,29 @@
}
}
- for (RecordInfo record : preparedTransaction.recordsToDelete)
+ for (RecordInfo recordDeleted : preparedTransaction.recordsToDelete)
{
- byte[] data = record.data;
-
- HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
-
- long messageID = record.id;
-
- DeleteEncoding encoding = new DeleteEncoding();
-
- encoding.decode(buff);
-
- Queue queue = queues.get(encoding.queueID);
-
- if (queue == null)
+ byte[] data = recordDeleted.data;
+
+ if (data.length > 0)
{
- throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+ byte b = buff.readByte();
+
+ switch (b)
+ {
+ case ADD_LARGE_MESSAGE_PENDING:
+ {
+ long messageID = buff.readLong();
+ pendingLargeMessages.remove(new Pair<Long, Long>(recordDeleted.id, messageID));
+ installLargeMessageConfirmationOnTX(tx, recordDeleted.id);
+ break;
+ }
+ default:
+ log.warn("can't locate recordType=" + b + " on loadPreparedTransaction//deleteRecords");
+ }
}
-
- MessageReference removed = queue.removeReferenceWithID(messageID);
-
- if (removed != null)
- {
- referencesToAck.add(removed);
- }
-
+
}
for (MessageReference ack : referencesToAck)
@@ -2301,6 +2342,50 @@
}
+ public static class PendingLargeMessageEncoding implements EncodingSupport
+ {
+ public long largeMessageID;
+
+ public PendingLargeMessageEncoding(final long pendingLargeMessageID)
+ {
+ this.largeMessageID = pendingLargeMessageID;
+ }
+
+ public PendingLargeMessageEncoding()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.spi.core.remoting.HornetQBuffer)
+ */
+ public void decode(final HornetQBuffer buffer)
+ {
+ largeMessageID = buffer.readLong();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.spi.core.remoting.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(largeMessageID);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_LONG;
+ }
+
+ public String toString()
+ {
+ return "PendingLargeMessageEncoding::MessageID=" + largeMessageID;
+ }
+
+ }
+
public static class DeliveryCountUpdateEncoding implements EncodingSupport
{
public long queueID;
@@ -2388,17 +2473,48 @@
}
- public static class DeleteEncoding extends QueueEncoding
+ public static class DeleteEncoding implements EncodingSupport
{
+ public byte recordType;
+
+ public long id;
+
public DeleteEncoding()
{
super();
}
- public DeleteEncoding(final long queueID)
+ public DeleteEncoding(final byte recordType, final long id)
{
- super(queueID);
+ this.recordType = recordType;
+ this.id = id;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeByte(recordType);
+ buffer.writeLong(id);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ recordType = buffer.readByte();
+ id = buffer.readLong();
+ }
}
public static class RefEncoding extends QueueEncoding
@@ -2866,7 +2982,15 @@
switch (rec)
{
+ case ADD_LARGE_MESSAGE_PENDING:
+ {
+ PendingLargeMessageEncoding lmEncoding = new PendingLargeMessageEncoding();
+ lmEncoding.decode(buffer);
+
+ return lmEncoding;
+ }
case ADD_LARGE_MESSAGE:
+ {
LargeServerMessage largeMessage = new LargeServerMessageImpl(null);
@@ -2875,19 +2999,20 @@
messageEncoding.decode(buffer);
return new MessageDescribe(largeMessage);
-
+ }
case ADD_MESSAGE:
+ {
ServerMessage message = new ServerMessageImpl(rec, 50);
message.decode(buffer);
return new MessageDescribe(message);
-
+ }
case ADD_REF:
{
final RefEncoding encoding = new RefEncoding();
encoding.decode(buffer);
- return new ReferenceDescribe(encoding);
+ return encoding;
}
case ACKNOWLEDGE_REF:
@@ -3341,5 +3466,90 @@
journal.stop();
}
+
+ private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
+ {
+ TXLargeMessageConfirmationOperation txoper = (TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
+ if (txoper == null)
+ {
+ txoper = new TXLargeMessageConfirmationOperation();
+ tx.putProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS, txoper);
+ }
+ txoper.confirmedMessages.add(recordID);
+ }
+
+
+
+ class TXLargeMessageConfirmationOperation implements TransactionOperation
+ {
+
+ public List<Long> confirmedMessages = new LinkedList<Long>();
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterPrepare(Transaction tx)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterCommit(Transaction tx)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeRollback(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterRollback(Transaction tx)
+ {
+ for (Long msg : confirmedMessages)
+ {
+ try
+ {
+ JournalStorageManager.this.confirmPendingLargeMessage(msg);
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error while confirming large message completion on rollback for recordID=" + msg +
+ "->" +
+ e.getMessage(),
+ e);
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
+ */
+ public List<MessageReference> getRelatedMessageReferences()
+ {
+ return null;
+ }
+
+ }
+
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -51,6 +51,8 @@
private LargeServerMessage linkMessage;
+ private long pendingRecordID = -1;
+
private boolean paged;
// We should only use the NIO implementation on the Journal
@@ -87,6 +89,19 @@
// Public --------------------------------------------------------
+ /**
+ * @param pendingRecordID
+ */
+ public void setPendingRecordID(long pendingRecordID)
+ {
+ this.pendingRecordID = pendingRecordID;
+ }
+
+ public long getPendingRecordID()
+ {
+ return this.pendingRecordID;
+ }
+
public void setPaged()
{
paged = true;
@@ -228,7 +243,12 @@
{
validateFile();
releaseResources();
- storageManager.deleteFile(file);
+ storageManager.deleteLargeMessage(file);
+ if (pendingRecordID >= 0)
+ {
+ storageManager.confirmPendingLargeMessage(pendingRecordID);
+ pendingRecordID = -1;
+ }
}
public boolean isFileExists() throws Exception
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -164,7 +164,22 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.LargeServerMessage#setPendingRecordID(long)
+ */
+ public void setPendingRecordID(long pendingRecordID)
+ {
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.LargeServerMessage#getPendingRecordID()
+ */
+ public long getPendingRecordID()
+ {
+ return -1;
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -16,6 +16,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
@@ -45,6 +46,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.transaction.Transaction;
/**
*
@@ -296,7 +298,8 @@
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
Map<Long, QueueBindingInfo> queueInfos,
- final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+ final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+ Set<Pair<Long, Long>> pendingLM) throws Exception
{
return new JournalLoadInformation();
}
@@ -569,4 +572,18 @@
}
-}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessageTX(org.hornetq.core.transaction.Transaction, long, long)
+ */
+ public void confirmPendingLargeMessageTX(Transaction transaction, long messageID, long recordID) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessage(long)
+ */
+ public void confirmPendingLargeMessage(long recordID) throws Exception
+ {
+ }
+
+ }
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -47,6 +47,7 @@
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueInfo;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
@@ -931,6 +932,10 @@
if (store.page(message, context, entry.getValue()))
{
+ if (message.isLargeMessage())
+ {
+ confirmLargeMessageSend(tx, message);
+ }
// We need to kick delivery so the Queues may check for the cursors case they are empty
schedulePageDelivery(tx, entry);
@@ -984,6 +989,11 @@
{
storageManager.storeMessage(message);
}
+
+ if (message.isLargeMessage())
+ {
+ confirmLargeMessageSend(tx, message);
+ }
}
if (tx != null)
@@ -1040,6 +1050,31 @@
}
/**
+ * @param tx
+ * @param message
+ * @throws Exception
+ */
+ private void confirmLargeMessageSend(Transaction tx, final ServerMessage message) throws Exception
+ {
+ LargeServerMessage largeServerMessage = (LargeServerMessage)message;
+ if (largeServerMessage.getPendingRecordID() >= 0)
+ {
+ if (tx == null)
+ {
+ storageManager.confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
+ }
+ else
+ {
+
+ storageManager.confirmPendingLargeMessageTX(tx,
+ largeServerMessage.getMessageID(),
+ largeServerMessage.getPendingRecordID());
+ }
+ largeServerMessage.setPendingRecordID(-1);
+ }
+ }
+
+ /**
* This will kick a delivery async on the queue, so the queue may have a chance to depage messages
* @param tx
* @param entry
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -28,6 +28,10 @@
/** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we specify a link between the messages */
void setLinkedMessage(LargeServerMessage message);
+
+ void setPendingRecordID(long pendingRecordID);
+
+ long getPendingRecordID();
boolean isFileExists() throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -92,6 +92,7 @@
import org.hornetq.core.server.Divert;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MemoryManager;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.Queue;
@@ -1545,13 +1546,16 @@
}
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
+
+ HashSet<Pair<Long, Long>> pendingLargeMessages = new HashSet<Pair<Long, Long>>();
journalInfo[1] = storageManager.loadMessageJournal(postOffice,
pagingManager,
resourceManager,
queues,
queueBindingInfosMap,
- duplicateIDMap);
+ duplicateIDMap,
+ pendingLargeMessages);
for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet())
{
@@ -1564,6 +1568,16 @@
cache.load(entry.getValue());
}
}
+
+ for (Pair<Long, Long> msgToDelete : pendingLargeMessages)
+ {
+ log.info("Deleting pending large message as it wasn't completed:" + msgToDelete);
+ LargeServerMessage msg = storageManager.createLargeMessage();
+ msg.setMessageID(msgToDelete.b);
+ msg.setPendingRecordID(msgToDelete.a);
+ msg.setDurable(true);
+ msg.deleteFile();
+ }
return journalInfo;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -1292,6 +1292,11 @@
// Public
// ----------------------------------------------------------------------------
+
+ public void clearLargeMessage()
+ {
+ currentLargeMessage = null;
+ }
// Private
// ----------------------------------------------------------------------------
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -24,6 +24,8 @@
*/
public class TransactionPropertyIndexes
{
+
+ public static final int LARGE_MESSAGE_CONFIRMATIONS = 1;
public static final int PAGE_SYNC = 2;
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -0,0 +1,539 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientConsumerInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerSessionImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A LargeMessageTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ * Created 29-Sep-08 4:04:10 PM
+ *
+ *
+ */
+public class InterruptedLargeMessageTest extends LargeMessageTestBase
+{
+ // Constants -----------------------------------------------------
+
+ final static int RECEIVE_WAIT_TIME = 60000;
+
+ private final int LARGE_MESSAGE_SIZE = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 3;
+
+ // Attributes ----------------------------------------------------
+
+ static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ // Static --------------------------------------------------------
+ private final Logger log = Logger.getLogger(LargeMessageTest.class);
+
+ protected ServerLocator locator;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = true;
+ clearData();
+ locator = createFactory(isNetty());
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ locator.close();
+ super.tearDown();
+ }
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ public void testInterruptLargeMessageSend() throws Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = true;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.getConfiguration()
+ .getInterceptorClassNames()
+ .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
+
+ server.start();
+
+ locator.setBlockOnNonDurableSend(false);
+ locator.setBlockOnDurableSend(false);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ clientFile.setExpiration(System.currentTimeMillis());
+
+ producer.send(clientFile);
+
+ Thread.sleep(500);
+
+ for (ServerSession srvSession : server.getSessions())
+ {
+ ((ServerSessionImpl)srvSession).clearLargeMessage();
+ }
+
+ server.stop(false);
+ server.start();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testSendNonPersistentQueue() throws Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ // server.getConfiguration()
+ // .getInterceptorClassNames()
+ // .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
+
+ server.start();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, false);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+ }
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ session.start();
+
+ for (int h = 0; h < 5; h++)
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage clientMessage = cons.receive(5000);
+ assertNotNull(clientMessage);
+ for (int countByte = 0; countByte < messageSize; countByte++)
+ {
+ assertEquals(getSamplebyte(countByte), clientMessage.getBodyBuffer().readByte());
+ }
+ clientMessage.acknowledge();
+ }
+ session.rollback();
+ }
+
+ server.stop(false);
+ server.start();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testSendPaging() throws Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+ try
+ {
+ server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>());
+
+ // server.getConfiguration()
+ // .getInterceptorClassNames()
+ // .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
+
+ server.start();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ server.getPagingManager().getPageStore(ADDRESS).startPaging();
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+ }
+ session.commit();
+
+ for (int h = 0; h < 5; h++)
+ {
+ session.close();
+
+ sf.close();
+
+ server.stop();
+
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage clientMessage = cons.receive(5000);
+
+ System.out.println("msg " + clientMessage);
+ assertNotNull(clientMessage);
+ for (int countByte = 0; countByte < messageSize; countByte++)
+ {
+ assertEquals(getSamplebyte(countByte), clientMessage.getBodyBuffer().readByte());
+ }
+ clientMessage.acknowledge();
+ }
+ if (h == 4)
+ {
+ session.commit();
+ }
+ else
+ {
+ session.rollback();
+ }
+ }
+
+ server.stop(false);
+ server.start();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testSendPreparedXA() throws Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+ try
+ {
+ server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>());
+
+ // server.getConfiguration()
+ // .getInterceptorClassNames()
+ // .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
+
+ server.start();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, false, false);
+
+ Xid xid1 = newXID();
+ Xid xid2 = newXID();
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ session.start(xid1, XAResource.TMNOFLAGS);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+ clientFile.putIntProperty("txid", 1);
+ producer.send(clientFile);
+ }
+ session.end(xid1, XAResource.TMSUCCESS);
+
+ session.prepare(xid1);
+
+
+ session.start(xid2, XAResource.TMNOFLAGS);
+
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+ clientFile.putIntProperty("txid", 2);
+ producer.send(clientFile);
+ }
+ session.end(xid2, XAResource.TMSUCCESS);
+
+ session.prepare(xid2);
+
+ session.close();
+ sf.close();
+
+ server.stop(false);
+ server.start();
+
+ for (int start = 0 ; start < 2; start++)
+ {
+
+ sf = locator.createSessionFactory();
+
+ if (start == 0)
+ {
+ session = sf.createSession(true, false, false);
+ session.commit(xid1, false);
+ session.close();
+ }
+
+ session = sf.createSession(false, false, false);
+ ClientConsumer cons1 = session.createConsumer(ADDRESS);
+ session.start();
+ for (int i = 0 ; i < 10; i++)
+ {
+ ClientMessage msg = cons1.receive(5000);
+ assertNotNull(msg);
+ assertEquals(1, msg.getIntProperty("txid").intValue());
+ msg.acknowledge();
+ }
+
+ if (start == 1)
+ {
+ session.commit();
+ }
+ else
+ {
+ session.rollback();
+ }
+
+ session.close();
+ sf.close();
+
+ server.stop();
+ server.start();
+ }
+ server.stop();
+
+ validateNoFilesOnLargeDir(10);
+
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, false, false);
+ session.rollback(xid2);
+
+ sf.close();
+
+ server.stop();
+ server.start();
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public static class LargeMessageTestInterceptorIgnoreLastPacket implements Interceptor
+ {
+
+ public static boolean interruptMessages = false;
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.Interceptor#intercept(org.hornetq.core.protocol.core.Packet, org.hornetq.spi.core.protocol.RemotingConnection)
+ */
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (packet instanceof SessionSendContinuationMessage)
+ {
+ SessionSendContinuationMessage msg = (SessionSendContinuationMessage)packet;
+ if (!msg.isContinues() && interruptMessages)
+ {
+ System.out.println("Ignored a message");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ }
+
+}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -2973,15 +2973,11 @@
super.setUp();
clearData();
locator = createFactory(isNetty());
- log.info("\n*********************************************************************************\n Starting " + getName() +
- "\n*********************************************************************************");
}
@Override
protected void tearDown() throws Exception
{
- log.info("\n*********************************************************************************\nDone with " + getName() +
- "\n*********************************************************************************");
locator.close();
super.tearDown();
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -76,7 +76,7 @@
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
- journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null);
+ journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null, null);
assertEquals(98, deletedMessage.size());
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -101,7 +101,7 @@
Map<Long, Queue> queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(postOffice, null, null, queues, null, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null, null, null);
journal.stop();
@@ -111,7 +111,7 @@
queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(postOffice, null, null, queues, null, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null, null, null);
queueBindingInfos = new ArrayList<QueueBindingInfo>();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -128,7 +128,7 @@
Map<Long, Queue> queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null);
+ journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null, null);
}
/**
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -18,6 +18,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -72,6 +73,7 @@
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.transaction.Transaction;
import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.RandomUtil;
@@ -1231,7 +1233,8 @@
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
Map<Long, QueueBindingInfo> queueInfos,
- final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+ final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
+ Set<Pair<Long, Long>> pendingLargeMessages) throws Exception
{
return new JournalLoadInformation();
}
@@ -1680,8 +1683,22 @@
}
- }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessageTX(org.hornetq.core.transaction.Transaction, long, long)
+ */
+ public void confirmPendingLargeMessageTX(Transaction transaction, long messageID, long recordID) throws Exception
+ {
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessage(long)
+ */
+ public void confirmPendingLargeMessage(long recordID) throws Exception
+ {
+ }
+
+ }
+
class FakeStoreFactory implements PagingStoreFactory
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -112,7 +112,8 @@
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
null,
- mapDups);
+ mapDups,
+ null);
Assert.assertEquals(0, mapDups.size());
@@ -134,7 +135,8 @@
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
null,
- mapDups);
+ mapDups,
+ null);
Assert.assertEquals(1, mapDups.size());
@@ -163,7 +165,8 @@
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
null,
- mapDups);
+ mapDups,
+ null);
Assert.assertEquals(1, mapDups.size());
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-23 16:09:48 UTC (rev 11404)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-24 06:22:26 UTC (rev 11405)
@@ -306,9 +306,12 @@
server = HornetQServers.newHornetQServer(configuration, false);
}
- for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+ if (settings != null)
{
- server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+ for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+ {
+ server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+ }
}
AddressSettings defaultSetting = new AddressSettings();
13 years, 3 months
JBoss hornetq SVN: r11404 - in branches/STOMP11: tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-23 12:09:48 -0400 (Fri, 23 Sep 2011)
New Revision: 11404
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-23 13:53:53 UTC (rev 11403)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-23 16:09:48 UTC (rev 11404)
@@ -13,7 +13,6 @@
package org.hornetq.core.protocol.stomp.v10;
import java.io.UnsupportedEncodingException;
-import java.util.List;
import java.util.Map;
import org.hornetq.api.core.HornetQBuffer;
@@ -27,7 +26,6 @@
import org.hornetq.core.protocol.stomp.StompConnection;
import org.hornetq.core.protocol.stomp.StompDecoder;
import org.hornetq.core.protocol.stomp.StompFrame;
-import org.hornetq.core.protocol.stomp.StompFrame.Header;
import org.hornetq.core.protocol.stomp.StompSubscription;
import org.hornetq.core.protocol.stomp.StompUtils;
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
@@ -53,7 +51,6 @@
@Override
public StompFrame onConnect(StompFrame frame)
{
- log.error("-----------------onConnection ()");
StompFrame response = null;
Map<String, String> headers = frame.getHeadersMap();
String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
@@ -61,10 +58,8 @@
String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
- log.error("------------ validating user: " + login + " code " + passcode);
if (connection.validateUser(login, passcode))
{
- log.error("-------user OK!!!");
connection.setClientID(clientID);
connection.setValid(true);
@@ -84,7 +79,6 @@
}
else
{
- log.error("--------user NOT ok!!");
//not valid
response = new StompFrameV10(Stomp.Responses.ERROR);
response.addHeader(Stomp.Headers.Error.MESSAGE, "Failed to connect");
@@ -95,7 +89,6 @@
catch (UnsupportedEncodingException e)
{
log.error("Encoding problem", e);
- //then we will send a null body message.
}
}
return response;
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-23 13:53:53 UTC (rev 11403)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-23 16:09:48 UTC (rev 11404)
@@ -70,7 +70,7 @@
}
sb.append((char)0);
- String data = new String(sb.toString());
+ String data = sb.toString();
byte[] byteValue = data.getBytes("UTF-8");
@@ -82,6 +82,35 @@
}
@Override
+ public ByteBuffer toByteBufferWithExtra(String str) throws UnsupportedEncodingException
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append(command + "\n");
+ int n = headers.size();
+ for (int i = 0; i < n; i++)
+ {
+ sb.append(headers.get(i).key + ":" + headers.get(i).val + "\n");
+ }
+ sb.append("\n");
+ if (body != null)
+ {
+ sb.append(body);
+ }
+ sb.append((char)0);
+ sb.append(str);
+
+ String data = sb.toString();
+
+ byte[] byteValue = data.getBytes("UTF-8");
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(byteValue.length);
+ buffer.put(byteValue);
+
+ buffer.rewind();
+ return buffer;
+ }
+
+ @Override
public boolean needsReply()
{
if ("CONNECT".equals(command) || headerKeys.contains(HEADER_RECEIPT))
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-23 13:53:53 UTC (rev 11403)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-23 16:09:48 UTC (rev 11404)
@@ -113,7 +113,38 @@
}
return response;
}
-
+
+ public ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException, InterruptedException
+ {
+ ClientStompFrame response = null;
+ ByteBuffer buffer = frame.toByteBufferWithExtra("\n");
+
+ while (buffer.remaining() > 0)
+ {
+ socketChannel.write(buffer);
+ }
+
+ //now response
+ if (frame.needsReply())
+ {
+ response = receiveFrame();
+
+ //filter out server ping
+ while (response != null)
+ {
+ if (response.getCommand().equals("STOMP"))
+ {
+ response = receiveFrame();
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ return response;
+ }
+
public ClientStompFrame receiveFrame() throws InterruptedException
{
return frameQueue.poll(10, TimeUnit.SECONDS);
@@ -138,9 +169,16 @@
frameBytes[j] = receiveList.get(j);
}
ClientStompFrame frame = factory.createFrame(new String(frameBytes, "UTF-8"));
- frameQueue.offer(frame);
- receiveList.clear();
+ if (validateFrame(frame))
+ {
+ frameQueue.offer(frame);
+ receiveList.clear();
+ }
+ else
+ {
+ receiveList.add(b);
+ }
}
}
else
@@ -152,6 +190,20 @@
readBuffer.rewind();
}
+ private boolean validateFrame(ClientStompFrame f) throws UnsupportedEncodingException
+ {
+ String h = f.getHeader("content-length");
+ if (h != null)
+ {
+ int len = Integer.valueOf(h);
+ if (f.getBody().getBytes("UTF-8").length < len)
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
protected void close() throws IOException
{
socketChannel.close();
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java 2011-09-23 13:53:53 UTC (rev 11403)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java 2011-09-23 16:09:48 UTC (rev 11404)
@@ -39,5 +39,7 @@
public String getHeader(String header);
public String getBody();
+
+ public ByteBuffer toByteBufferWithExtra(String str) throws UnsupportedEncodingException;
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-23 13:53:53 UTC (rev 11403)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-23 16:09:48 UTC (rev 11404)
@@ -48,6 +48,8 @@
void stopPinger();
void destroy();
+
+ ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException, InterruptedException;
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-23 13:53:53 UTC (rev 11403)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-23 16:09:48 UTC (rev 11404)
@@ -22,6 +22,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.jms.BytesMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -1210,7 +1211,104 @@
connV11.disconnect();
}
+ public void testSendMessage() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ // Assert default priority 4 is used when priority header is not set
+ Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
+ public void testSendMessageWithContentLength() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ byte[] data = new byte[] { 1, 0, 0, 4 };
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody(new String(data, "UTF-8"));
+
+ frame.addHeader("content-length", String.valueOf(data.length));
+
+ connV11.sendFrame(frame);
+
+ BytesMessage message = (BytesMessage)consumer.receive(10000);
+ Assert.assertNotNull(message);
+ //there is one extra null byte
+ assertEquals(data.length + 1, message.getBodyLength());
+ assertEquals(data[0], message.readByte());
+ assertEquals(data[1], message.readByte());
+ assertEquals(data[2], message.readByte());
+ assertEquals(data[3], message.readByte());
+ }
+
+ public void testSendMessageWithCustomHeadersAndSelector() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("foo", "abc");
+ frame.addHeader("bar", "123");
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+ }
+
+ public void testSendMessageWithLeadingNewLine() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ connV11.sendWickedFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
//-----------------private help methods
private void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
13 years, 3 months
JBoss hornetq SVN: r11403 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/server and 5 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-23 09:53:53 -0400 (Fri, 23 Sep 2011)
New Revision: 11403
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/Delivery.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/DeliveryImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/HQDeque.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/HQIterator.java
Modified:
branches/HORNETQ-720_Replication/
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java
branches/HORNETQ-720_Replication/tests/pom.xml
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/LinkedListTest.java
Log:
merge from trunk
Property changes on: branches/HORNETQ-720_Replication
___________________________________________________________________
Modified: svn:mergeinfo
- /trunk:10878-11242
+ /trunk:10878-11402
Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/Delivery.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/Delivery.java 2011-09-23 12:22:05 UTC (rev 11402)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/Delivery.java 2011-09-23 13:53:53 UTC (rev 11403)
@@ -1,28 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server;
-
-/**
- *
- * A Delivery
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public interface Delivery
-{
- MessageReference getReference();
-
- long getConsumerID();
-}
Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/DeliveryImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/DeliveryImpl.java 2011-09-23 12:22:05 UTC (rev 11402)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/DeliveryImpl.java 2011-09-23 13:53:53 UTC (rev 11403)
@@ -1,47 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.impl;
-
-import org.hornetq.core.server.Delivery;
-import org.hornetq.core.server.MessageReference;
-
-/**
- *
- * A DeliveryImpl
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public class DeliveryImpl implements Delivery
-{
- private final long consumerID;
-
- private final MessageReference reference;
-
- public DeliveryImpl(final long consumerID, final MessageReference reference)
- {
- this.consumerID = consumerID;
- this.reference = reference;
- }
-
- public long getConsumerID()
- {
- return consumerID;
- }
-
- public MessageReference getReference()
- {
- return reference;
- }
-}
Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/HQDeque.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/HQDeque.java 2011-09-23 12:22:05 UTC (rev 11402)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/HQDeque.java 2011-09-23 13:53:53 UTC (rev 11403)
@@ -1,39 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.utils;
-
-
-/**
- * A HQDeque
- *
- * @author Tim Fox
- *
- *
- */
-public interface HQDeque<T>
-{
- void addFirst(T t);
-
- void addLast(T t);
-
- HQIterator<T> iterator();
-
- boolean isEmpty();
-
- T removeFirst();
-
- T getFirst();
-
- void clear();
-}
Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/HQIterator.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/HQIterator.java 2011-09-23 12:22:05 UTC (rev 11402)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/HQIterator.java 2011-09-23 13:53:53 UTC (rev 11403)
@@ -1,30 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.utils;
-
-/**
- * A HQIterator
- *
- * @author Tim Fox
- *
- *
- */
-public interface HQIterator<E>
-{
- E next();
-
- void remove();
-
- void prev();
-}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2011-09-23 12:22:05 UTC (rev 11402)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2011-09-23 13:53:53 UTC (rev 11403)
@@ -149,10 +149,10 @@
class TestHandler implements MessageHandler
{
- private CountDownLatch latch;
+ private final CountDownLatch latch;
int count = 0;
- private ClientSession clientSession;
+ private final ClientSession clientSession;
public TestHandler(CountDownLatch latch, ClientSession clientSession)
{
@@ -251,10 +251,12 @@
SimpleString dlq = new SimpleString("DLQ1");
clientSession.createQueue(dla, dlq, null, false);
clientSession.createQueue(qName, qName, null, false);
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
- ClientSessionFactory sessionFactory = locator.createSessionFactory();
+ final ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ final ClientSessionFactory sessionFactory = locator.createSessionFactory();
ClientSession sendSession = sessionFactory.createSession(false, true, true);
- ClientProducer producer = sendSession.createProducer(qName);
+ try
+ {
+ ClientProducer producer = sendSession.createProducer(qName);
Map<String, Long> origIds = new HashMap<String, Long>();
for (int i = 0; i < NUM_MESSAGES; i++)
@@ -320,7 +322,10 @@
}
sendSession.close();
-
+ } finally {
+ sessionFactory.close();
+ locator.close();
+ }
}
public void testDeadlLetterAddressWithDefaultAddressSettings() throws Exception
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java 2011-09-23 12:22:05 UTC (rev 11402)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java 2011-09-23 13:53:53 UTC (rev 11403)
@@ -26,7 +26,7 @@
import org.hornetq.tests.util.UnitTestCase;
/**
- *
+ *
* A SessionCloseOnGCTest
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
@@ -57,7 +57,7 @@
protected void tearDown() throws Exception
{
//locator.close();
-
+
server.stop();
server = null;
@@ -269,12 +269,13 @@
public void testCloseOneSessionOnGC() throws Exception
{
- ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl) locator.createSessionFactory();
+ final ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl)locator.createSessionFactory();
- ClientSession session = sf.createSession(false, true, true);
+ try
+ {
+ ClientSession session = sf.createSession(false, true, true);
+ WeakReference<ClientSession> wses = new WeakReference<ClientSession>(session);
- WeakReference<ClientSession> wses = new WeakReference<ClientSession>(session);
-
Assert.assertEquals(1, server.getRemotingService().getConnections().size());
session = null;
@@ -284,12 +285,18 @@
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(1, sf.numConnections());
Assert.assertEquals(1, server.getRemotingService().getConnections().size());
+ }
+ finally
+ {
+ sf.close();
+ }
}
public void testCloseSeveralSessionOnGC() throws Exception
{
- ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl) locator.createSessionFactory();
-
+ final ClientSessionFactoryImpl sf = (ClientSessionFactoryImpl)locator.createSessionFactory();
+ try
+ {
ClientSession session1 = sf.createSession(false, true, true);
ClientSession session2 = sf.createSession(false, true, true);
ClientSession session3 = sf.createSession(false, true, true);
@@ -309,6 +316,11 @@
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(1, sf.numConnections());
Assert.assertEquals(1, server.getRemotingService().getConnections().size());
+
+ }
+ finally
+ {
+ sf.close();
+ }
}
-
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java 2011-09-23 12:22:05 UTC (rev 11402)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionCloseTest.java 2011-09-23 13:53:53 UTC (rev 11403)
@@ -21,11 +21,14 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.tests.util.RandomUtil;
@@ -49,6 +52,8 @@
private ClientSessionFactory sf;
+ private ServerLocator locator;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -252,7 +257,9 @@
server.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ locator =
+ HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(
+ UnitTestCase.INVM_CONNECTOR_FACTORY));
sf = locator.createSessionFactory();
}
@@ -270,8 +277,13 @@
server.stop();
}
+ if (locator != null)
+ {
+ locator.close();
+ }
+
+ locator = null;
sf = null;
-
server = null;
super.tearDown();
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java 2011-09-23 12:22:05 UTC (rev 11402)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionFactoryTest.java 2011-09-23 13:53:53 UTC (rev 11403)
@@ -38,7 +38,7 @@
import org.hornetq.tests.util.ServiceTestBase;
/**
- *
+ *
* A ClientSessionFactoryTest
*
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
@@ -50,7 +50,7 @@
{
private static final Logger log = Logger.getLogger(SessionFactoryTest.class);
- private DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(getUDPDiscoveryAddress(), getUDPDiscoveryPort());
+ private final DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(getUDPDiscoveryAddress(), getUDPDiscoveryPort());
private HornetQServer liveService;
@@ -101,16 +101,22 @@
Assert.assertNotNull(csi);
csi.close();
-
+
locator.close();
}
public void testCloseUnusedClientSessionFactoryWithoutGlobalPools() throws Exception
{
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(liveTC);
-
- ClientSessionFactory csf = locator.createSessionFactory();
- csf.close();
+ try
+ {
+ ClientSessionFactory csf = locator.createSessionFactory();
+ csf.close();
+ }
+ finally
+ {
+ locator.close();
+ }
}
public void testDiscoveryConstructor() throws Exception
@@ -141,15 +147,15 @@
HornetQClient.DEFAULT_RETRY_INTERVAL,
HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
HornetQClient.DEFAULT_RECONNECT_ATTEMPTS);
-
+
ClientSessionFactory cf = locator.createSessionFactory();
ClientSession session = cf.createSession(false, true, true);
Assert.assertNotNull(session);
session.close();
testSettersThrowException(cf);
-
+
cf.close();
-
+
locator.close();
}
@@ -182,13 +188,13 @@
HornetQClient.DEFAULT_RETRY_INTERVAL,
HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
HornetQClient.DEFAULT_RECONNECT_ATTEMPTS);
-
+
ClientSessionFactory cf = locator.createSessionFactory();
ClientSession session = cf.createSession(false, true, true);
Assert.assertNotNull(session);
session.close();
testSettersThrowException(cf);
-
+
cf.close();
}
Modified: branches/HORNETQ-720_Replication/tests/pom.xml
===================================================================
--- branches/HORNETQ-720_Replication/tests/pom.xml 2011-09-23 12:22:05 UTC (rev 11402)
+++ branches/HORNETQ-720_Replication/tests/pom.xml 2011-09-23 13:53:53 UTC (rev 11403)
@@ -12,10 +12,6 @@
<artifactId>hornetq-tests-pom</artifactId>
<packaging>pom</packaging>
- <properties>
- <skipUnitTests>true</skipUnitTests>
- </properties>
-
<build>
<plugins>
<plugin>
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/LinkedListTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/LinkedListTest.java 2011-09-23 12:22:05 UTC (rev 11402)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/LinkedListTest.java 2011-09-23 13:53:53 UTC (rev 11403)
@@ -57,6 +57,7 @@
payload = new byte[10 * 1024];
}
+ @Override
protected void finalize() throws Exception
{
count.decrementAndGet();
@@ -112,10 +113,8 @@
assertEquals(1000, count.get());
- int removed = 0;
while (iter.hasNext())
{
- System.out.println("removed " + (removed++));
iter.next();
iter.remove();
}
13 years, 3 months
JBoss hornetq SVN: r11402 - trunk.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-23 08:22:05 -0400 (Fri, 23 Sep 2011)
New Revision: 11402
Modified:
trunk/.gitignore
Log:
Update .gitignore file
Modified: trunk/.gitignore
===================================================================
--- trunk/.gitignore 2011-09-23 12:21:52 UTC (rev 11401)
+++ trunk/.gitignore 2011-09-23 12:22:05 UTC (rev 11402)
@@ -2,6 +2,9 @@
tests/jms-tests/data
org.eclipse.jdt.core.prefs
org.eclipse.jdt.ui.prefs
+org.eclipse.m2e.core.prefs
+*/.settings/org.eclipse.*
+*/*/.settings/org.eclipse.*
!etc/org.eclipse.jdt.ui.prefs
!etc/org.eclipse.jdt.core.prefs
org.maven.ide.eclipse.prefs
13 years, 3 months
JBoss hornetq SVN: r11401 - trunk/tests.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-23 08:21:52 -0400 (Fri, 23 Sep 2011)
New Revision: 11401
Modified:
trunk/tests/pom.xml
Log:
remove misplaced property setting from pom.xml
Modified: trunk/tests/pom.xml
===================================================================
--- trunk/tests/pom.xml 2011-09-23 12:21:30 UTC (rev 11400)
+++ trunk/tests/pom.xml 2011-09-23 12:21:52 UTC (rev 11401)
@@ -12,10 +12,6 @@
<artifactId>hornetq-tests-pom</artifactId>
<packaging>pom</packaging>
- <properties>
- <skipUnitTests>true</skipUnitTests>
- </properties>
-
<build>
<plugins>
<plugin>
13 years, 3 months