JBoss hornetq SVN: r11300 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/deployers/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-09-07 10:16:34 -0400 (Wed, 07 Sep 2011)
New Revision: 11300
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6035
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java 2011-09-07 13:38:25 UTC (rev 11299)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/deployers/impl/FileDeploymentManagerTest.java 2011-09-07 14:16:34 UTC (rev 11300)
@@ -80,7 +80,9 @@
fdm.start();
try
{
- Assert.assertEquals(file.toURI().toURL(), deployer.deployedUrl);
+ URL expected = file.toURI().toURL();
+ URL deployedUrl = deployer.deployedUrl;
+ Assert.assertTrue(expected.toString().equalsIgnoreCase(deployedUrl.toString()));
deployer.deployedUrl = null;
fdm.start();
Assert.assertNull(deployer.deployedUrl);
@@ -112,7 +114,9 @@
try
{
fdm.registerDeployer(deployer);
- Assert.assertEquals(file.toURI().toURL(), deployer.deployedUrl);
+ URL expected = file.toURI().toURL();
+ URL deployedUrl = deployer.deployedUrl;
+ Assert.assertTrue(expected.toString().equalsIgnoreCase(deployedUrl.toString()));
deployer.deployedUrl = null;
fdm.start();
Assert.assertNull(deployer.deployedUrl);
@@ -254,7 +258,9 @@
Assert.assertEquals(1, fdm.getDeployers().size());
Assert.assertTrue(fdm.getDeployers().contains(deployer));
Assert.assertEquals(1, fdm.getDeployed().size());
- Assert.assertEquals(file.toURI().toURL(), deployer.reDeployedUrl);
+ URL expected = file.toURI().toURL();
+ URL deployedUrl = deployer.deployedUrl;
+ Assert.assertTrue(expected.toString().equalsIgnoreCase(deployedUrl.toString()));
Pair<URL, Deployer> pair = new Pair<URL, Deployer>(url, deployer);
Assert.assertEquals(oldLastModified + 1000, fdm.getDeployed().get(pair).lastModified);
deployer.reDeployedUrl = null;
13 years, 4 months
JBoss hornetq SVN: r11299 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl and 13 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-07 09:38:25 -0400 (Wed, 07 Sep 2011)
New Revision: 11299
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncPagingTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/FakePagingManager.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-720 Synchronization of Pages for replication
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -15,11 +15,12 @@
import java.util.List;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.paging.cursor.LivePageCache;
import org.hornetq.core.persistence.StorageManager;
/**
- *
+ *
* @see PagingManager
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
@@ -31,7 +32,7 @@
void write(PagedMessage message) throws Exception;
List<PagedMessage> read(StorageManager storage) throws Exception;
-
+
void setLiveCache(LivePageCache pageCache);
int getSize();
@@ -45,4 +46,6 @@
void close() throws Exception;
boolean delete(PagedMessage[] messages) throws Exception;
+
+ SequentialFile getFile();
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingManager.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingManager.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -17,37 +17,32 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.postoffice.Address;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.settings.HierarchicalRepositoryChangeListener;
/**
- *
- *
-<PRE>
-
-+------------+ 1 +-------------+ N +------------+ N +-------+ 1 +----------------+
-| {@link PostOffice} |-------> |PagingManager|-------> |{@link PagingStore} | ------> | {@link Page} | ------> | {@link SequentialFile} |
-+------------+ +-------------+ +------------+ +-------+ +----------------+
- | 1 ^
- | |
- | |
- | | 1
- | N +---------+
- +--------> | Address |
- +---------+
-
-</PRE>
-
- *
+ * <PRE>
+ *
+ * +------------+ 1 +-------------+ N +------------+ N +-------+ 1 +----------------+
+ * | {@link PostOffice} |-------> |{@link PagingManager}|-------> |{@link PagingStore} | ------> | {@link Page} | ------> | {@link SequentialFile} |
+ * +------------+ +-------------+ +------------+ +-------+ +----------------+
+ * | 1 ^
+ * | |
+ * | |
+ * | | 1
+ * | N +---------+ /
+ * +--------> | {@link Address} |
+ * +---------+
+ * </PRE>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
- *
*/
public interface PagingManager extends HornetQComponent, HierarchicalRepositoryChangeListener
{
- /** To return the PageStore associated with the address */
+ /** Returns the PageStore associated with the address. A new page store is created if necessary. */
PagingStore getPageStore(SimpleString address) throws Exception;
/** An injection point for the PostOffice to inject itself */
@@ -67,18 +62,30 @@
* @param transactionID
*/
void removeTransaction(long transactionID);
-
+
Map<Long, PageTransactionInfo> getTransactions();
/**
* Reload previously created PagingStores into memory
- * @throws Exception
+ * @throws Exception
*/
void reloadStores() throws Exception;
SimpleString[] getStoreNames();
void deletePageStore(SimpleString storeName) throws Exception;
-
+
void processReload() throws Exception;
+
+ /**
+ * Lock the manager, and all its {@link PagingStore}s. This method should not be called during
+ * normal PagingManager usage.
+ */
+ void lockAll();
+
+ /**
+ * Unlock the manager.
+ * @see #lockAll()
+ */
+ void unlockAll();
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -13,8 +13,12 @@
package org.hornetq.core.paging;
+import java.util.Collection;
+
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
@@ -23,46 +27,46 @@
import org.hornetq.core.settings.impl.AddressSettings;
/**
- *
- * <p>The implementation will take care of details such as PageSize.</p>
- * <p>The producers will write directly to PagingStore and that will decide what
- * Page file should be used based on configured size</p>
- *
+ * <p>
+ * The implementation will take care of details such as PageSize.
+ * </p>
+ * <p>
+ * The producers will write directly to PagingStore, and the store will decide what Page file should
+ * be used based on configured size.
+ * </p>
* @see PagingManager
-
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
- *
*/
public interface PagingStore extends HornetQComponent
{
SimpleString getAddress();
int getNumberOfPages();
-
+
// The current page in which the system is writing files
int getCurrentWritingPage();
SimpleString getStoreName();
AddressFullMessagePolicy getAddressFullMessagePolicy();
-
+
long getFirstPage();
-
+
long getTopPage();
long getPageSizeBytes();
long getAddressSize();
-
+
long getMaxSize();
-
+
void applySetting(AddressSettings addressSettings);
boolean isPaging();
// It will schedule sync to the file storage
void sync() throws Exception;
-
+
// It will perform a real sync on the current IO file
void ioSync() throws Exception;
@@ -71,23 +75,23 @@
boolean page(ServerMessage message, RoutingContext ctx, RouteContextList listCtx) throws Exception;
Page createPage(final int page) throws Exception;
-
- boolean checkPage(final int page) throws Exception;
-
+
+ boolean checkPageFileExists(final int page) throws Exception;
+
PagingManager getPagingManager();
-
+
PageCursorProvider getCursorProvier();
-
+
void processReload() throws Exception;
-
- /**
+
+ /**
* Remove the first page from the Writing Queue.
- * The file will still exist until Page.delete is called,
+ * The file will still exist until Page.delete is called,
* So, case the system is reloaded the same Page will be loaded back if delete is not called.
*
* @throws Exception
- *
- * Note: This should still be part of the interface, even though HornetQ only uses through the
+ *
+ * Note: This should still be part of the interface, even though HornetQ only uses through the
*/
Page depage() throws Exception;
@@ -103,20 +107,37 @@
void stopPaging() throws Exception;
void addSize(int size);
-
+
void executeRunnableWhenMemoryAvailable(Runnable runnable);
-
+
/** This method will hold and producer, but it wait operations to finish before locking (write lock) */
void lock();
-
- /**
- *
+
+ /**
+ *
* Call this method using the same thread used by the last call of {@link PagingStore#lock()}
- *
+ *
*/
void unlock();
/** This is used mostly by tests.
* We will wait any pending runnable to finish its execution */
void flushExecutors();
+
+ /**
+ * Files to synchronize with backup.
+ * @return
+ * @throws Exception
+ */
+ Collection<Integer> getCurrentIds() throws Exception;
+
+ /**
+ * Sends the pages with given ids to the replicator.
+ * <p>
+ * Sending is done here to avoid exposing the internal {@link SequentialFile}s.
+ * @param replicator
+ * @param pageIds
+ * @throws Exception
+ */
+ void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -150,7 +150,7 @@
cache = softCache.get(pageId);
if (cache == null)
{
- if (!pagingStore.checkPage((int)pageId))
+ if (!pagingStore.checkPageFileExists((int)pageId))
{
return null;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -32,7 +32,7 @@
import org.hornetq.utils.DataConstants;
/**
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
@@ -59,7 +59,7 @@
private final SequentialFile file;
private final SequentialFileFactory fileFactory;
-
+
/**
* The page cache that will be filled with data as we write more data
*/
@@ -96,7 +96,7 @@
{
return pageId;
}
-
+
public void setLiveCache(LivePageCache pageCache)
{
this.pageCache = pageCache;
@@ -109,7 +109,7 @@
size.set((int)file.size());
// Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
ByteBuffer buffer2 = ByteBuffer.allocateDirect(size.get());
-
+
file.position(0);
file.read(buffer2);
@@ -181,7 +181,7 @@
buffer.rewind();
file.writeDirect(buffer, false);
-
+
if (pageCache != null)
{
pageCache.addLiveMessage(message);
@@ -234,7 +234,7 @@
if (msg.getMessage().isLargeMessage())
{
LargeServerMessage lmsg = (LargeServerMessage)msg.getMessage();
-
+
// Remember, cannot call delete directly here
// Because the large-message may be linked to another message
// or it may still being delivered even though it has been acked already
@@ -257,7 +257,7 @@
{
file.delete();
}
-
+
return true;
}
catch (Exception e)
@@ -276,13 +276,14 @@
{
return size.intValue();
}
-
+
+ @Override
public String toString()
{
return "PageImpl::pageID=" + this.pageId + ", file=" + this.file;
}
-
+
/* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
@@ -339,5 +340,11 @@
suspiciousRecords = true;
}
+ @Override
+ public SequentialFile getFile()
+ {
+ return file;
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -24,9 +24,9 @@
import org.hornetq.utils.DataConstants;
/**
- *
+ *
* This class represents a paged message
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
*
@@ -45,11 +45,14 @@
// Public --------------------------------------------------------
- /** Large messages will need to be instatiated lazily during getMessage when the StorageManager is available */
+ /**
+ * Large messages will need to be instantiated lazily during getMessage when the StorageManager
+ * is available
+ */
private byte[] largeMessageLazyData;
private ServerMessage message;
-
+
private long queueIDs[];
private long transactionID = 0;
@@ -74,17 +77,17 @@
{
return message;
}
-
+
public void initMessage(StorageManager storage)
{
if (largeMessageLazyData != null)
{
LargeServerMessage lgMessage = storage.createLargeMessage();
- message = lgMessage;
HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(largeMessageLazyData);
- message.decodeHeadersAndProperties(buffer);
+ lgMessage.decodeHeadersAndProperties(buffer);
lgMessage.incrementDelayDeletionCount();
lgMessage.setPaged();
+ message = lgMessage;
largeMessageLazyData = null;
}
}
@@ -93,7 +96,7 @@
{
return transactionID;
}
-
+
public long[] getQueueIDs()
{
return queueIDs;
@@ -123,11 +126,11 @@
message.decode(buffer);
}
-
+
int queueIDsSize = buffer.readInt();
-
+
queueIDs = new long[queueIDsSize];
-
+
for (int i = 0 ; i < queueIDsSize; i++)
{
queueIDs[i] = buffer.readLong();
@@ -143,18 +146,18 @@
buffer.writeInt(message.getEncodeSize());
message.encode(buffer);
-
+
buffer.writeInt(queueIDs.length);
-
- for (int i = 0 ; i < queueIDs.length; i++)
+
+ for (long queueID : queueIDs)
{
- buffer.writeLong(queueIDs[i]);
+ buffer.writeLong(queueID);
}
}
public int getEncodeSize()
{
- return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize() +
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize() +
DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -18,6 +18,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
@@ -31,7 +32,7 @@
import org.hornetq.core.settings.impl.AddressSettings;
/**
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
@@ -45,6 +46,13 @@
private volatile boolean started = false;
+ /**
+ * Lock used at the start of synchronization between a live server and its backup.
+ * Synchronization will lock all {@link PagingStore} instances, and so any operation here that
+ * requires a lock on a {@link PagingStore} instance needs to take a read-lock on
+ * {@link #syncLock} to avoid dead-locks.
+ */
+ private final ReentrantReadWriteLock syncLock = new ReentrantReadWriteLock();
private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<SimpleString, PagingStore>();
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
@@ -53,7 +61,8 @@
private final StorageManager storageManager;
- private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
+ private final ConcurrentMap<Long, PageTransactionInfo> transactions =
+ new ConcurrentHashMap<Long, PageTransactionInfo>();
// Static
// --------------------------------------------------------------------------------------------------------------------------
@@ -76,9 +85,9 @@
// Public
// ---------------------------------------------------------------------------------------------------------------------------
-
+
// Hierarchical changes listener
-
+
/* (non-Javadoc)
* @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
*/
@@ -87,60 +96,58 @@
reaplySettings();
}
-
-
// PagingManager implementation
// -----------------------------------------------------------------------------------------------------
public void reaplySettings()
{
- for (PagingStore store : stores.values())
+ for (PagingStore store : stores.values())
{
AddressSettings settings = this.addressSettingsRepository.getMatch(store.getAddress().toString());
store.applySetting(settings);
}
}
-
+
public SimpleString[] getStoreNames()
{
Set<SimpleString> names = stores.keySet();
return names.toArray(new SimpleString[names.size()]);
}
- public synchronized void reloadStores() throws Exception
+ public void reloadStores() throws Exception
{
- List<PagingStore> reloadedStores = pagingStoreFactory.reloadStores(addressSettingsRepository);
+ lock();
+ try
+ {
+ List<PagingStore> reloadedStores = pagingStoreFactory.reloadStores(addressSettingsRepository);
- for (PagingStore store : reloadedStores)
+ for (PagingStore store : reloadedStores)
+ {
+ store.start();
+ stores.put(store.getStoreName(), store);
+ }
+ }
+ finally
{
- store.start();
- stores.put(store.getStoreName(), store);
+ unlock();
}
}
- private synchronized PagingStore createPageStore(final SimpleString storeName) throws Exception
+ public void deletePageStore(final SimpleString storeName) throws Exception
{
- PagingStore store = stores.get(storeName);
-
- if (store == null)
+ syncLock.readLock().lock();
+ try
{
- store = newStore(storeName);
-
- store.start();
-
- stores.put(storeName, store);
+ PagingStore store = stores.remove(storeName);
+ if (store != null)
+ {
+ store.stop();
+ }
}
-
- return store;
- }
-
- public void deletePageStore(final SimpleString storeName) throws Exception
- {
- PagingStore store = stores.remove(storeName);
- if (store != null)
+ finally
{
- store.stop();
+ syncLock.readLock().unlock();
}
}
@@ -149,12 +156,11 @@
{
PagingStore store = stores.get(storeName);
- if (store == null)
+ if (store != null)
{
- store = createPageStore(storeName);
+ return store;
}
-
- return store;
+ return newStore(storeName);
}
/** this will be set by the postOffice itself.
@@ -179,7 +185,7 @@
{
return transactions.get(id);
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.PagingManager#getTransactions()
*/
@@ -197,37 +203,53 @@
return started;
}
- public synchronized void start() throws Exception
+ public void start() throws Exception
{
- if (started)
+ lock();
+ try
{
- return;
- }
+ if (started)
+ {
+ return;
+ }
- pagingStoreFactory.setPagingManager(this);
+ pagingStoreFactory.setPagingManager(this);
- pagingStoreFactory.setStorageManager(storageManager);
+ pagingStoreFactory.setStorageManager(storageManager);
- reloadStores();
+ reloadStores();
- started = true;
+ started = true;
+ }
+ finally
+ {
+ unlock();
+ }
}
- public synchronized void stop() throws Exception
+ public void stop() throws Exception
{
- if (!started)
+ lock();
+ try
{
- return;
- }
+ if (!started)
+ {
+ return;
+ }
- started = false;
+ started = false;
- for (PagingStore store : stores.values())
+ for (PagingStore store : stores.values())
+ {
+ store.stop();
+ }
+
+ pagingStoreFactory.stop();
+ }
+ finally
{
- store.stop();
+ unlock();
}
-
- pagingStoreFactory.stop();
}
public void processReload() throws Exception
@@ -245,17 +267,52 @@
// Private -------------------------------------------------------
- protected PagingStore newStore(final SimpleString address)
+ private PagingStore newStore(final SimpleString address) throws Exception
{
- return pagingStoreFactory.newStore(address,
- addressSettingsRepository.getMatch(address.toString()));
+ lock();
+ try {
+ PagingStore store = stores.get(address);
+ if (store == null)
+ {
+ store = pagingStoreFactory.newStore(address, addressSettingsRepository.getMatch(address.toString()));
+ store.start();
+ stores.put(address, store);
+ }
+ return store;
+ }
+ finally
+ {
+ unlock();
+ }
}
-
- protected PagingStoreFactory getStoreFactory()
+
+ private void unlock()
{
- return pagingStoreFactory;
+ syncLock.writeLock().unlock();
}
- // Inner classes -------------------------------------------------
+ private void lock()
+ {
+ syncLock.writeLock().lock();
+ }
+ @Override
+ public synchronized void lockAll()
+ {
+ syncLock.writeLock().lock();
+ for (PagingStore store : stores.values())
+ {
+ store.lock();
+ }
+ }
+
+ @Override
+ public void unlockAll()
+ {
+ for (PagingStore store : stores.values())
+ {
+ store.unlock();
+ }
+ syncLock.writeLock().unlock();
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -40,7 +40,7 @@
import org.hornetq.utils.UUIDGenerator;
/**
- *
+ *
* Integration point between Paging and NIO
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
@@ -62,9 +62,9 @@
protected final boolean syncNonTransactional;
private PagingManager pagingManager;
-
+
private final ScheduledExecutorService scheduledExecutor;
-
+
private final long syncTimeout;
private StorageManager storageManager;
@@ -86,9 +86,9 @@
this.executorFactory = executorFactory;
this.syncNonTransactional = syncNonTransactional;
-
+
this.scheduledExecutor = scheduledExecutor;
-
+
this.syncTimeout = syncTimeout;
}
@@ -155,6 +155,7 @@
public void setPostOffice(final PostOffice postOffice)
{
+ assert this.postOffice == null;
this.postOffice = postOffice;
}
@@ -233,22 +234,22 @@
{
return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName, false);
}
-
+
protected PagingManager getPagingManager()
{
return pagingManager;
}
-
+
protected StorageManager getStorageManager()
{
return storageManager;
}
-
+
protected PostOffice getPostOffice()
{
return postOffice;
}
-
+
protected ExecutorFactory getExecutorFactory()
{
return executorFactory;
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -13,8 +13,9 @@
package org.hornetq.core.paging.impl;
-import java.io.File;
import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -42,10 +43,8 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.impl.LivePageCacheImpl;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
-import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.RouteContextList;
@@ -56,15 +55,13 @@
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionOperation;
-import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
/**
- *
+ *
* @see PagingStore
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
@@ -210,7 +207,7 @@
pageSize = addressSettings.getPageSizeBytes();
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
-
+
if (cursorProvider != null)
{
cursorProvider.setCacheMaxSize(addressSettings.getPageCacheMaxSize());
@@ -219,6 +216,7 @@
// Public --------------------------------------------------------
+ @Override
public String toString()
{
return "PagingStoreImpl(" + this.address + ")";
@@ -375,21 +373,28 @@
public synchronized void stop() throws Exception
{
- if (running)
+ lock();
+ try
{
+ if (running)
+ {
+ cursorProvider.stop();
- cursorProvider.stop();
+ running = false;
- running = false;
+ flushExecutors();
- flushExecutors();
-
- if (currentPage != null)
- {
- currentPage.close();
- currentPage = null;
+ if (currentPage != null)
+ {
+ currentPage.close();
+ currentPage = null;
+ }
}
}
+ finally
+ {
+ unlock();
+ }
}
public void flushExecutors()
@@ -556,8 +561,8 @@
{
return currentPage;
}
-
- public boolean checkPage(final int pageNumber)
+
+ public boolean checkPageFileExists(final int pageNumber)
{
String fileName = createFileName(pageNumber);
SequentialFile file = fileFactory.createSequentialFile(fileName, 1);
@@ -566,7 +571,10 @@
public Page createPage(final int pageNumber) throws Exception
{
- String fileName = createFileName(pageNumber);
+ lock();
+ try
+ {
+ String fileName = createFileName(pageNumber);
if (fileFactory == null)
{
@@ -585,6 +593,12 @@
file.close();
return page;
+ }
+
+ finally
+ {
+ unlock();
+ }
}
public void forceAnotherPage() throws Exception
@@ -592,13 +606,13 @@
openNewPage();
}
- /**
- * It returns a Page out of the Page System without reading it.
+ /**
+ * It returns a Page out of the Page System without reading it.
* The method calling this method will remove the page and will start reading it outside of any locks.
* This method could also replace the current file by a new file, and that process is done through acquiring a writeLock on currentPageLock
- *
- * Observation: This method is used internally as part of the regular depage process, but externally is used only on tests,
- * and that's why this method is part of the Testable Interface
+ *
+ * Observation: This method is used internally as part of the regular depage process, but externally is used only on tests,
+ * and that's why this method is part of the Testable Interface
* */
public Page depage() throws Exception
{
@@ -681,7 +695,7 @@
* @return
* @throws Exception
*/
- private Queue<OurRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<OurRunnable>();
+ private final Queue<OurRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<OurRunnable>();
private class MemoryFreedRunnablesExecutor implements Runnable
{
@@ -866,7 +880,7 @@
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), tx == null ? -1 : tx.getID());
-
+
if (message.isLargeMessage())
{
((LargeServerMessage)message).setPaged();
@@ -880,9 +894,9 @@
openNewPage();
currentPageSize.addAndGet(bytesToWrite);
}
-
+
currentPage.write(pagedMessage);
-
+
if (tx != null)
{
installPageTransaction(tx, listCtx);
@@ -945,15 +959,15 @@
private static class FinishPageMessageOperation implements TransactionOperation
{
public final PageTransactionInfo pageTransaction;
-
+
private final StorageManager storageManager;
-
+
private final PagingManager pagingManager;
-
+
private final Set<PagingStore> usedStores = new HashSet<PagingStore>();
private boolean stored = false;
-
+
public void addStore(PagingStore store)
{
this.usedStores.add(store);
@@ -1078,9 +1092,9 @@
}
/**
- *
+ *
* Note: Decimalformat is not thread safe, Use synchronization before calling this method
- *
+ *
* @param pageID
* @return
*/
@@ -1100,5 +1114,41 @@
return maxSize > 0 && getAddressSize() > maxSize;
}
+ @Override
+ public Collection<Integer> getCurrentIds() throws Exception
+ {
+ List<Integer> ids = new ArrayList<Integer>();
+ if (fileFactory != null)
+ {
+ for (String fileName : fileFactory.listFiles("page"))
+ {
+ ids.add(getPageIdFromFileName(fileName));
+ }
+ }
+ return ids;
+ }
+
+ @Override
+ public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception
+ {
+ lock();
+ try
+ {
+ for (Integer id : pageIds)
+ {
+ SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id), 1);
+ if (!sFile.exists())
+ {
+ continue;
+ }
+ replicator.syncPages(sFile, id, getAddress());
+ }
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -14,7 +14,6 @@
package org.hornetq.core.persistence.impl.journal;
import java.io.File;
-import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.security.AccessController;
@@ -350,9 +349,10 @@
/**
* XXX FIXME HORNETQ-720 Method ignores the synchronization of Paging.
* @param replicationManager
+ * @param pagingManager
* @throws HornetQException
*/
- public void startReplication(ReplicationManager replicationManager) throws Exception
+ public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception
{
if (!started)
{
@@ -374,6 +374,7 @@
final boolean messageJournalAutoReclaim = localMessageJournal.getAutoReclaim();
final boolean bindingsJournalAutoReclaim = localBindingsJournal.getAutoReclaim();
Map<String, Long> largeMessageFilesToSync;
+ Map<SimpleString, Collection<Integer>> pageFilesToSync;
try
{
storageManagerLock.writeLock().lock();
@@ -385,9 +386,18 @@
localBindingsJournal.writeLock();
try
{
- messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
- bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
- largeMessageFilesToSync = getLargeMessageInformation();
+ pagingManager.lockAll();
+ try
+ {
+ messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
+ bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+ pageFilesToSync = getPageInformationForSync(pagingManager);
+ largeMessageFilesToSync = getLargeMessageInformation();
+ }
+ finally
+ {
+ pagingManager.unlockAll();
+ }
}
finally
{
@@ -405,6 +415,7 @@
sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
sendLargeMessageFiles(largeMessageFilesToSync);
+ sendPagesToBackup(pageFilesToSync, pagingManager);
storageManagerLock.writeLock().lock();
try
@@ -424,6 +435,42 @@
}
}
+ /**
+ * @param pageFilesToSync
+ * @throws Exception
+ */
+ private void sendPagesToBackup(Map<SimpleString, Collection<Integer>> pageFilesToSync, PagingManager manager)
+ throws Exception
+ {
+ for (Entry<SimpleString, Collection<Integer>> entry : pageFilesToSync.entrySet())
+ {
+ PagingStore store = manager.getPageStore(entry.getKey());
+ store.sendPages(replicator, entry.getValue());
+ }
+
+ }
+
+ /**
+ * @param pagingManager
+ * @return
+ * @throws Exception
+ */
+ private Map<SimpleString, Collection<Integer>> getPageInformationForSync(PagingManager pagingManager)
+ throws Exception
+ {
+ Map<SimpleString, Collection<Integer>> info = new HashMap<SimpleString, Collection<Integer>>();
+ for (SimpleString storeName : pagingManager.getStoreNames())
+ {
+ PagingStore store = pagingManager.getPageStore(storeName);
+ List<Integer> ids = new ArrayList<Integer>();
+ info.put(storeName, store.getCurrentIds());
+ // XXX perhaps before? unnecessary?
+ store.forceAnotherPage();
+ }
+ replicator.sendPagingInfo(info);
+ return info;
+ }
+
private void sendLargeMessageFiles(Map<String, Long> largeMessageFilesToSync) throws Exception
{
for (Entry<String, Long> entry : largeMessageFilesToSync.entrySet())
@@ -464,11 +511,7 @@
}
/**
- * Send an entire journal file to a replicating server (a backup server that is).
- * @param jf
- * @param replicator2
- * @throws IOException
- * @throws HornetQException
+ * Send an entire journal file to a replicating backup server.
*/
private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws Exception
{
@@ -809,7 +852,7 @@
readLock();
try
{
- messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
+ messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
}
finally
{
@@ -855,9 +898,9 @@
readLock();
try
{
- pageTransaction.setRecordID(generateUniqueID());
+ pageTransaction.setRecordID(generateUniqueID());
- messageJournal.appendAddRecordTransactional(txID,
+ messageJournal.appendAddRecordTransactional(txID,
pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
pageTransaction);
@@ -873,11 +916,10 @@
readLock();
try
{
- messageJournal.appendUpdateRecordTransactional(txID,
- pageTransaction.getRecordID(),
- JournalStorageManager.PAGE_TRANSACTION,
- new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
- depages));
+ messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(),
+ JournalStorageManager.PAGE_TRANSACTION,
+ new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+ depages));
}
finally
{
@@ -890,7 +932,7 @@
readLock();
try
{
- messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
+ messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
syncNonTransactional,
@@ -907,7 +949,7 @@
readLock();
try
{
- messageJournal.appendUpdateRecordTransactional(txID,
+ messageJournal.appendUpdateRecordTransactional(txID,
messageID,
JournalStorageManager.ADD_REF,
new RefEncoding(queueID));
@@ -923,7 +965,7 @@
readLock();
try
{
- messageJournal.appendUpdateRecordTransactional(txID,
+ messageJournal.appendUpdateRecordTransactional(txID,
messageID,
JournalStorageManager.ACKNOWLEDGE_REF,
new RefEncoding(queueID));
@@ -1042,7 +1084,7 @@
readLock();
try
{
- messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
+ messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
}
finally
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -102,6 +102,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCurrentPagesMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -536,6 +537,11 @@
packet = new ReplicationSyncFileMessage();
break;
}
+ case PacketImpl.REPLICATION_CURRENT_PAGES_INFO:
+ {
+ packet = new ReplicationCurrentPagesMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -196,6 +196,7 @@
public static final byte HA_BACKUP_REGISTRATION = 113;
public static final byte REPLICATION_START_STOP_SYNC = 120;
+ public static final byte REPLICATION_CURRENT_PAGES_INFO = 121;
// Static --------------------------------------------------------
Added: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -0,0 +1,77 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+public final class ReplicationCurrentPagesMessage extends PacketImpl
+{
+
+ private Map<SimpleString, Collection<Integer>> info;
+
+ /**
+ * @param type
+ */
+ public ReplicationCurrentPagesMessage()
+ {
+ super(REPLICATION_CURRENT_PAGES_INFO);
+ }
+
+ /**
+ * @param info
+ */
+ public ReplicationCurrentPagesMessage(Map<SimpleString, Collection<Integer>> info)
+ {
+ this();
+ this.info = info;
+ }
+
+ @Override
+ public void decodeRest(HornetQBuffer buffer)
+ {
+ info = new HashMap<SimpleString, Collection<Integer>>();
+ int entries = buffer.readInt();
+ for (int i = 0; i < entries; i++)
+ {
+ SimpleString name = buffer.readSimpleString();
+ int nPages = buffer.readInt();
+ List<Integer> ids = new ArrayList<Integer>(nPages);
+ for (int j = 0; j < nPages; j++)
+ {
+ ids.add(Integer.valueOf(buffer.readInt()));
+ }
+ info.put(name, ids);
+ }
+ }
+
+ @Override
+ public void encodeRest(HornetQBuffer buffer)
+ {
+ buffer.writeInt(info.size());
+ for (Entry<SimpleString, Collection<Integer>> entry : info.entrySet())
+ {
+ buffer.writeSimpleString(entry.getKey());
+ Collection<Integer> value = entry.getValue();
+ buffer.writeInt(value.size());
+ for (Integer id : value)
+ {
+ buffer.writeInt(id);
+ }
+ }
+ }
+
+ public Map<SimpleString, Collection<Integer>> getInfo()
+ {
+ return info;
+ }
+}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -1,17 +1,17 @@
package org.hornetq.core.protocol.core.impl.wireformat;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
- * Message is used to:
- * <ol>
- * <li>copy JournalFile data over to the backup during synchronization;
- * <li>send a up-to-date signal to backup;
- * </ol>
+ * Message is used to sync {@link SequentialFile}s to a backup server. The {@link FileType} controls
+ * which extra information is sent.
*/
public final class ReplicationSyncFileMessage extends PacketImpl
{
@@ -28,31 +28,89 @@
private int dataSize;
private ByteBuffer byteBuffer;
private byte[] byteArray;
+ private SimpleString pageStoreName;
+ private FileType fileType;
+ public enum FileType
+ {
+ JOURNAL(0), PAGE(1), LARGE_MESSAGE(2);
+
+ private byte code;
+
+ private FileType(int code)
+ {
+ this.code = (byte)code;
+ }
+
+ /**
+ * @param readByte
+ * @return
+ */
+ public static FileType getFileType(byte readByte)
+ {
+ for (FileType type : EnumSet.allOf(FileType.class))
+ {
+ if (type.code == readByte)
+ return type;
+ }
+ throw new InternalError("Unsupported byte value for " + FileType.class);
+ }
+ }
+
public ReplicationSyncFileMessage()
{
super(REPLICATION_SYNC_FILE);
}
- public ReplicationSyncFileMessage(JournalContent content, long id, int size, ByteBuffer buffer)
+ public ReplicationSyncFileMessage(JournalContent content, SimpleString storeName, long id, int size,
+ ByteBuffer buffer)
{
this();
this.byteBuffer = buffer;
+ this.pageStoreName = storeName;
this.dataSize = size;
this.fileId = id;
this.journalType = content;
+ determineType();
}
+ private void determineType()
+ {
+ if (journalType != null)
+ {
+ fileType = FileType.JOURNAL;
+ }
+ else if (pageStoreName != null)
+ {
+ fileType = FileType.PAGE;
+ }
+ else
+ {
+ fileType = FileType.LARGE_MESSAGE;
+ }
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeLong(fileId);
if (fileId == -1)
return;
- boolean isJournal = journalType != null;
- buffer.writeBoolean(isJournal);
- if (isJournal)
- buffer.writeByte(journalType.typeByte);
+ buffer.writeByte(fileType.code);
+ switch (fileType)
+ {
+ case JOURNAL:
+ {
+ buffer.writeByte(journalType.typeByte);
+ break;
+ }
+ case PAGE:
+ {
+ buffer.writeSimpleString(pageStoreName);
+ break;
+ }
+ }
+
buffer.writeInt(dataSize);
/*
* sending -1 will close the file in case of a journal, but not in case of a largeMessage
@@ -68,9 +126,25 @@
public void decodeRest(final HornetQBuffer buffer)
{
fileId = buffer.readLong();
- if (buffer.readBoolean())
+ switch (FileType.getFileType(buffer.readByte()))
{
- journalType = JournalContent.getType(buffer.readByte());
+ case JOURNAL:
+ {
+ journalType = JournalContent.getType(buffer.readByte());
+ fileType = FileType.JOURNAL;
+ break;
+ }
+ case PAGE:
+ {
+ pageStoreName = buffer.readSimpleString();
+ fileType = FileType.PAGE;
+ break;
+ }
+ case LARGE_MESSAGE:
+ {
+ fileType = FileType.LARGE_MESSAGE;
+ break;
+ }
}
int size = buffer.readInt();
if (size > 0)
@@ -90,19 +164,18 @@
return journalType;
}
- /**
- * @return
- */
public byte[] getData()
{
return byteArray;
}
- /**
- * @return
- */
- public boolean isLargeMessage()
+ public FileType getFileType()
{
- return journalType == null;
+ return fileType;
}
+
+ public SimpleString getPageStore()
+ {
+ return pageStoreName;
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -13,6 +13,8 @@
package org.hornetq.core.replication;
+import java.util.Collection;
+import java.util.Map;
import java.util.Set;
import org.hornetq.api.core.HornetQException;
@@ -114,4 +116,14 @@
* @throws Exception
*/
void syncLargeMessageFile(SequentialFile seqFile, long size, long id) throws Exception;
+
+ void sendPagingInfo(Map<SimpleString, Collection<Integer>> info);
+
+ /**
+ * @param file
+ * @param id
+ * @param pageStore
+ * @throws Exception
+ */
+ void syncPages(SequentialFile file, long id, SimpleString pageStore) throws Exception;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -14,6 +14,7 @@
package org.hornetq.core.replication.impl;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@@ -47,6 +48,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCurrentPagesMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -205,6 +207,10 @@
{
handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
}
+ else if (type == PacketImpl.REPLICATION_CURRENT_PAGES_INFO)
+ {
+ handleCurrentPagesInfo((ReplicationCurrentPagesMessage)packet);
+ }
else
{
log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
@@ -224,17 +230,22 @@
channel.send(response);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ /**
+ * @param packet
*/
+ private void handleCurrentPagesInfo(ReplicationCurrentPagesMessage packet)
+ {
+ for (Entry<SimpleString, Collection<Integer>> entry : packet.getInfo().entrySet())
+ {
+ // ignore the actual file list for the moment...
+ }
+ }
+
public boolean isStarted()
{
return started;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#start()
- */
public void start() throws Exception
{
Configuration config = server.getConfiguration();
@@ -429,7 +440,6 @@
SequentialFile sq = lm.getFile();
LargeServerMessage mainLM = largeMessagesOnSync.get(id);
SequentialFile mainSeqFile = mainLM.getFile();
- System.out.println(mainSeqFile);
for (;;)
{
buffer.rewind();
@@ -463,27 +473,41 @@
Long id = Long.valueOf(msg.getId());
byte[] data = msg.getData();
SequentialFile sf;
- if (msg.isLargeMessage())
+ switch (msg.getFileType())
{
- synchronized (largeMessagesOnSync)
+ case LARGE_MESSAGE:
{
- LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
- if (largeMessage == null)
+ synchronized (largeMessagesOnSync)
{
- largeMessage = storage.createLargeMessage();
- largeMessage.setDurable(true);
- largeMessage.setMessageID(id);
- largeMessagesOnSync.put(id, largeMessage);
+ LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
+ if (largeMessage == null)
+ {
+ largeMessage = storage.createLargeMessage();
+ largeMessage.setDurable(true);
+ largeMessage.setMessageID(id);
+ largeMessagesOnSync.put(id, largeMessage);
+ }
+ sf = largeMessage.getFile();
}
- sf = largeMessage.getFile();
+ break;
}
+ case JOURNAL:
+ {
+ JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(id);
+ sf = journalFile.getFile();
+ break;
+ }
+ case PAGE:
+ {
+ Page page = getPage(msg.getPageStore(), (int)msg.getId());
+
+ sf = page.getFile();
+ break;
+ }
+ default:
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Unhandled file type " + msg.getFileType());
}
- else
- {
- JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(id);
- sf = journalFile.getFile();
- }
if (data == null)
{
sf.close();
@@ -751,7 +775,6 @@
page.close();
}
}
-
}
/**
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -14,7 +14,9 @@
package org.hornetq.core.replication.impl;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.LinkedHashSet;
+import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -42,6 +44,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCurrentPagesMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -508,24 +511,32 @@
{
SequentialFile file = jf.getFile().copy();
log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
- sendLargeFile(content, jf.getFileID(), file, Long.MAX_VALUE);
+ sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
}
@Override
public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception
{
- sendLargeFile(null, id, file, size);
+ sendLargeFile(null, null, id, file, size);
}
+ @Override
+ public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception
+ {
+ sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
+ }
+
/**
* Sends large files in reasonably sized chunks to the backup during replication synchronization.
- * @param content journal type or {@code null} for large-messages
+ * @param content journal type or {@code null} for large-messages and pages
+ * @param pageStore page store name for pages, or {@code null} otherwise
* @param id journal file id or (large) message id
* @param file
* @param maxBytesToSend maximum number of bytes to read and send from the file
* @throws Exception
*/
- private void sendLargeFile(JournalContent content, final long id, SequentialFile file, long maxBytesToSend)
+ private void sendLargeFile(JournalContent content, SimpleString pageStore, final long id, SequentialFile file,
+ long maxBytesToSend)
throws Exception
{
if (!file.isOpen())
@@ -554,7 +565,7 @@
buffer.rewind();
// sending -1 or 0 bytes will close the file at the backup
- sendReplicatePacket(new ReplicationSyncFileMessage(content, id, bytesRead, buffer));
+ sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, bytesRead, buffer));
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
}
@@ -572,4 +583,10 @@
ReplicationStartSyncMessage msg = new ReplicationStartSyncMessage(null, null);
sendReplicatePacket(msg);
}
+
+ @Override
+ public void sendPagingInfo(Map<SimpleString, Collection<Integer>> info)
+ {
+ sendReplicatePacket(new ReplicationCurrentPagesMessage(info));
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -58,7 +58,6 @@
import org.hornetq.core.deployers.impl.SecurityDeployer;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
-import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.impl.SyncSpeedTest;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
@@ -1564,7 +1563,7 @@
pagingManager.reloadStores();
- JournalLoadInformation[] journalInfo = loadJournals();
+ loadJournals();
final ServerInfo dumper = new ServerInfo(this, pagingManager);
@@ -1646,15 +1645,13 @@
}
}
- private JournalLoadInformation[] loadJournals() throws Exception
+ private void loadJournals() throws Exception
{
- JournalLoadInformation[] journalInfo = new JournalLoadInformation[2];
-
List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
List<GroupingInfo> groupingInfos = new ArrayList<GroupingInfo>();
- journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
+ storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
recoverStoredConfigs();
@@ -1685,8 +1682,6 @@
managementService.registerAddress(queueBindingInfo.getAddress());
managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
-
-
}
for (GroupingInfo groupingInfo : groupingInfos)
@@ -1701,7 +1696,7 @@
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
- journalInfo[1] = storageManager.loadMessageJournal(postOffice,
+ storageManager.loadMessageJournal(postOffice,
pagingManager,
resourceManager,
queues,
@@ -1720,7 +1715,6 @@
}
}
- return journalInfo;
}
/**
@@ -2012,11 +2006,10 @@
}
JournalStorageManager journalStorageManager = (JournalStorageManager)storageManager;
-
replicationManager = new ReplicationManagerImpl(rc, executorFactory);
replicationManager.start();
- journalStorageManager.startReplication(replicationManager);
+ journalStorageManager.startReplication(replicationManager, pagingManager);
}
/**
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -19,6 +19,7 @@
import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
/**
@@ -118,7 +119,8 @@
@Override
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
{
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
+ writeRecord(deleteRecordTX, false, null);
}
@Override
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -13,6 +13,7 @@
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -27,7 +28,7 @@
private ClientSession session;
private ClientProducer producer;
private BackupSyncDelay syncDelay;
- protected static final int N_MSGS = 10;
+ protected int n_msgs = 10;
@Override
protected void setUp() throws Exception
@@ -59,17 +60,22 @@
for (int i = 0; i < totalRounds; i++)
{
messageJournal.forceMoveNextFile();
- sendMessages(session, producer, N_MSGS);
+ sendMessages(session, producer, n_msgs);
}
backupServer.start();
-
+ syncDelay.deliverUpToDateMsg();
waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
// SEND more messages, now with the backup replicating
- sendMessages(session, producer, N_MSGS);
+ sendMessages(session, producer, n_msgs);
Set<Long> liveIds = getFileIds(messageJournal);
-
+ PagingStore ps = liveServer.getServer().getPagingManager().getPageStore(ADDRESS);
+ if (ps.getPageSizeBytes() == PAGE_SIZE)
+ {
+ assertTrue("isStarted", ps.isStarted());
+ assertFalse("start paging should return false, because we expect paging to be running", ps.startPaging());
+ }
finishSyncAndFailover();
JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
@@ -79,7 +85,7 @@
// "+ 2": there two other calls that send N_MSGS.
for (int i = 0; i < totalRounds + 2; i++)
{
- receiveMsgsInRange(0, N_MSGS);
+ receiveMsgsInRange(0, n_msgs);
}
assertNoMoreMessages();
}
@@ -108,13 +114,13 @@
backupServer.start();
waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
- sendMessages(session, producer, N_MSGS);
+ sendMessages(session, producer, n_msgs);
session.commit();
- receiveMsgsInRange(0, N_MSGS);
+ receiveMsgsInRange(0, n_msgs);
finishSyncAndFailover();
- receiveMsgsInRange(0, N_MSGS);
+ receiveMsgsInRange(0, n_msgs);
assertNoMoreMessages();
}
@@ -131,16 +137,16 @@
{
createProducerSendSomeMessages();
startBackupCrashLive();
- receiveMsgsInRange(0, N_MSGS);
+ receiveMsgsInRange(0, n_msgs);
assertNoMoreMessages();
}
public void testMessageSync() throws Exception
{
createProducerSendSomeMessages();
- receiveMsgsInRange(0, N_MSGS / 2);
+ receiveMsgsInRange(0, n_msgs / 2);
startBackupCrashLive();
- receiveMsgsInRange(N_MSGS / 2, N_MSGS);
+ receiveMsgsInRange(n_msgs / 2, n_msgs);
assertNoMoreMessages();
}
@@ -159,7 +165,7 @@
session = sessionFactory.createSession(true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer, N_MSGS);
+ sendMessages(session, producer, n_msgs);
session.commit();
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -40,8 +40,8 @@
File dir = new File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
System.out.println("Dir " + dir.getAbsolutePath() + " " + dir.exists());
// Set<Long> idsOnBkp = getAllMessageFileIds(dir);
- receiveMsgsInRange(0, N_MSGS / 2);
- assertEquals("we really ought to delete these after delivery", N_MSGS / 2, getAllMessageFileIds(dir).size());
+ receiveMsgsInRange(0, n_msgs / 2);
+ assertEquals("we really ought to delete these after delivery", n_msgs / 2, getAllMessageFileIds(dir).size());
}
private Set<Long> getAllMessageFileIds(File dir)
Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncPagingTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncPagingTest.java (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncPagingTest.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -0,0 +1,34 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+
+public class BackupSyncPagingTest extends BackupSyncJournalTest
+{
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ n_msgs = 100;
+ super.setUp();
+ }
+
+ @Override
+ protected HornetQServer createInVMFailoverServer(final boolean realFiles, final Configuration configuration,
+ final NodeManager nodeManager)
+ {
+ Map<String, AddressSettings> conf = new HashMap<String, AddressSettings>();
+ AddressSettings as = new AddressSettings();
+ as.setMaxSizeBytes(PAGE_MAX);
+ as.setPageSizeBytes(PAGE_SIZE);
+ as.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ conf.put(ADDRESS.toString(), as);
+ return createInVMFailoverServer(realFiles, configuration, PAGE_SIZE, PAGE_MAX, conf, nodeManager);
+ }
+}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -68,6 +68,9 @@
protected static final int MIN_LARGE_MESSAGE = 1024;
private static final int LARGE_MESSAGE_SIZE = MIN_LARGE_MESSAGE * 3;
+ protected static final int PAGE_MAX = 2 * 1024;
+ protected static final int PAGE_SIZE = 1024;
+
// Attributes ----------------------------------------------------
protected TestableServer liveServer;
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -46,12 +46,8 @@
// Constants -----------------------------------------------------
- private static final int PAGE_MAX = 100 * 1024;
+ private static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
- private static final int PAGE_SIZE = 10 * 1024;
-
- static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
-
// Attributes ----------------------------------------------------
private ServerLocator locator;
@@ -91,7 +87,7 @@
internalTestPage(true, false);
}
- public void testPageTransactionedFailBeforeconsume() throws Exception
+ public void testPageTransactionedFailBeforeConsume() throws Exception
{
internalTestPage(true, true);
}
@@ -130,9 +126,12 @@
if (failBeforeConsume)
{
crash(session);
+ waitForBackup(sf, 60);
}
+
+
session.close();
session = sf.createSession(!transacted, !transacted, 0);
@@ -217,11 +216,7 @@
@Override
protected HornetQServer createServer(final boolean realFiles, final Configuration configuration)
{
- return createInVMFailoverServer(true,
- configuration,
- PagingFailoverTest.PAGE_SIZE,
- PagingFailoverTest.PAGE_MAX,
- new HashMap<String, AddressSettings>(),
+ return createInVMFailoverServer(true, configuration, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>(),
nodeManager);
}
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -14,9 +14,7 @@
package org.hornetq.tests.unit.core.paging.impl;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -53,6 +51,7 @@
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.hornetq.tests.unit.util.FakePagingManager;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.ExecutorFactory;
@@ -939,155 +938,6 @@
// Inner classes -------------------------------------------------
- class FakePagingManager implements PagingManager
- {
-
- public void activate()
- {
- }
-
- public long addSize(final long size)
- {
- return 0;
- }
-
- public void addTransaction(final PageTransactionInfo pageTransaction)
- {
- }
-
- public PagingStore createPageStore(final SimpleString destination) throws Exception
- {
- return null;
- }
-
- public long getTotalMemory()
- {
- return 0;
- }
-
- public SimpleString[] getStoreNames()
- {
- return null;
- }
-
- public long getMaxMemory()
- {
- return 0;
- }
-
- public PagingStore getPageStore(final SimpleString address) throws Exception
- {
- return null;
- }
-
- public void deletePageStore(SimpleString storeName) throws Exception
- {
- }
-
- public PageTransactionInfo getTransaction(final long transactionID)
- {
- return null;
- }
-
- public boolean isBackup()
- {
- return false;
- }
-
- public boolean isGlobalPageMode()
- {
- return false;
- }
-
- public boolean isPaging(final SimpleString destination) throws Exception
- {
- return false;
- }
-
- public boolean page(final ServerMessage message, final boolean duplicateDetection) throws Exception
- {
- return false;
- }
-
- public boolean page(final ServerMessage message, final long transactionId, final boolean duplicateDetection) throws Exception
- {
- return false;
- }
-
- public void reloadStores() throws Exception
- {
- }
-
- public void removeTransaction(final long transactionID)
- {
-
- }
-
- public void setGlobalPageMode(final boolean globalMode)
- {
- }
-
- public void setPostOffice(final PostOffice postOffice)
- {
- }
-
- public void resumeDepages()
- {
- }
-
- public void sync(final Collection<SimpleString> destinationsToSync) throws Exception
- {
- }
-
- public boolean isStarted()
- {
- return false;
- }
-
- public void start() throws Exception
- {
- }
-
- public void stop() throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.PagingManager#isGlobalFull()
- */
- public boolean isGlobalFull()
- {
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.PagingManager#getTransactions()
- */
- public Map<Long, PageTransactionInfo> getTransactions()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.PagingManager#processReload()
- */
- public void processReload()
- {
- // TODO Auto-generated method stub
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
- */
- public void onChange()
- {
- }
-
- }
-
-
class FakeStoreFactory implements PagingStoreFactory
{
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -14,10 +14,8 @@
package org.hornetq.tests.unit.core.postoffice.impl;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -28,18 +26,15 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.paging.PageTransactionInfo;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.impl.ResourceManagerImpl;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
+import org.hornetq.tests.unit.util.FakePagingManager;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.ExecutorFactory;
@@ -186,160 +181,4 @@
}
}
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
- static class FakePagingManager implements PagingManager
- {
-
- public void activate()
- {
- }
-
- public long addSize(final long size)
- {
- return 0;
- }
-
- public void addTransaction(final PageTransactionInfo pageTransaction)
- {
- }
-
- public PagingStore createPageStore(final SimpleString destination) throws Exception
- {
- return null;
- }
-
- public long getTotalMemory()
- {
- return 0;
- }
-
- public SimpleString[] getStoreNames()
- {
- return null;
- }
-
- public long getMaxMemory()
- {
- return 0;
- }
-
- public PagingStore getPageStore(final SimpleString address) throws Exception
- {
- return null;
- }
-
- public void deletePageStore(SimpleString storeName) throws Exception
- {
- }
-
- public PageTransactionInfo getTransaction(final long transactionID)
- {
- return null;
- }
-
- public boolean isBackup()
- {
- return false;
- }
-
- public boolean isGlobalPageMode()
- {
- return false;
- }
-
- public boolean isPaging(final SimpleString destination) throws Exception
- {
- return false;
- }
-
- public boolean page(final ServerMessage message, final boolean duplicateDetection) throws Exception
- {
- return false;
- }
-
- public boolean page(final ServerMessage message, final long transactionId, final boolean duplicateDetection) throws Exception
- {
- return false;
- }
-
- public void reloadStores() throws Exception
- {
- }
-
- public void removeTransaction(final long transactionID)
- {
-
- }
-
- public void setGlobalPageMode(final boolean globalMode)
- {
- }
-
- public void setPostOffice(final PostOffice postOffice)
- {
- }
-
- public void resumeDepages()
- {
- }
-
- public void sync(final Collection<SimpleString> destinationsToSync) throws Exception
- {
- }
-
- public boolean isStarted()
- {
- return false;
- }
-
- public void start() throws Exception
- {
- }
-
- public void stop() throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.PagingManager#isGlobalFull()
- */
- public boolean isGlobalFull()
- {
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.PagingManager#getTransactions()
- */
- public Map<Long, PageTransactionInfo> getTransactions()
- {
- return null;
- }
-
-
-
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.PagingManager#processReload()
- */
- public void processReload()
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
- */
- public void onChange()
- {
- }
-
- }
-
}
Added: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/FakePagingManager.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/FakePagingManager.java (rev 0)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/FakePagingManager.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -0,0 +1,173 @@
+package org.hornetq.tests.unit.util;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.ServerMessage;
+
+public final class FakePagingManager implements PagingManager
+{
+
+ public void activate()
+ {
+ }
+
+ public long addSize(final long size)
+ {
+ return 0;
+ }
+
+ public void addTransaction(final PageTransactionInfo pageTransaction)
+ {
+ }
+
+ public PagingStore createPageStore(final SimpleString destination) throws Exception
+ {
+ return null;
+ }
+
+ public long getTotalMemory()
+ {
+ return 0;
+ }
+
+ public SimpleString[] getStoreNames()
+ {
+ return null;
+ }
+
+ public long getMaxMemory()
+ {
+ return 0;
+ }
+
+ public PagingStore getPageStore(final SimpleString address) throws Exception
+ {
+ return null;
+ }
+
+ public void deletePageStore(SimpleString storeName) throws Exception
+ {
+ }
+
+ public PageTransactionInfo getTransaction(final long transactionID)
+ {
+ return null;
+ }
+
+ public boolean isBackup()
+ {
+ return false;
+ }
+
+ public boolean isGlobalPageMode()
+ {
+ return false;
+ }
+
+ public boolean isPaging(final SimpleString destination) throws Exception
+ {
+ return false;
+ }
+
+ public boolean page(final ServerMessage message, final boolean duplicateDetection) throws Exception
+ {
+ return false;
+ }
+
+ public boolean page(final ServerMessage message, final long transactionId, final boolean duplicateDetection)
+ throws Exception
+ {
+ return false;
+ }
+
+ public void reloadStores() throws Exception
+ {
+ }
+
+ public void removeTransaction(final long transactionID)
+ {
+
+ }
+
+ public void setGlobalPageMode(final boolean globalMode)
+ {
+ }
+
+ public void setPostOffice(final PostOffice postOffice)
+ {
+ }
+
+ public void resumeDepages()
+ {
+ }
+
+ public void sync(final Collection<SimpleString> destinationsToSync) throws Exception
+ {
+ }
+
+ public boolean isStarted()
+ {
+ return false;
+ }
+
+ public void start() throws Exception
+ {
+ }
+
+ public void stop() throws Exception
+ {
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#isGlobalFull()
+ */
+ public boolean isGlobalFull()
+ {
+ return false;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#getTransactions()
+ */
+ public Map<Long, PageTransactionInfo> getTransactions()
+ {
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#processReload()
+ */
+ public void processReload()
+ {
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
+ */
+ public void onChange()
+ {
+ }
+
+ @Override
+ public void lockAll()
+ {
+ // no-op
+ }
+
+ @Override
+ public void unlockAll()
+ {
+ // no-op
+ }
+
+}
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-09-07 13:38:25 UTC (rev 11299)
@@ -509,7 +509,7 @@
*/
protected void assertMessageBody(final int i, final ClientMessage message)
{
- Assert.assertEquals("message" + i, message.getBodyBuffer().readString());
+ Assert.assertEquals(message.toString(), "message" + i, message.getBodyBuffer().readString());
}
/**
13 years, 4 months
JBoss hornetq SVN: r11298 - in branches/Branch_2_2_EAP/src/config: jboss-as-5/clustered and 1 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-09-07 08:45:04 -0400 (Wed, 07 Sep 2011)
New Revision: 11298
Modified:
branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/hornetq-configuration.xml
branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-configuration.xml
branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-configuration.xml
Log:
https://issues.jboss.org/browse/JBPAPP-5723
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/hornetq-configuration.xml 2011-09-07 12:21:20 UTC (rev 11297)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/hornetq-configuration.xml 2011-09-07 12:45:04 UTC (rev 11298)
@@ -79,8 +79,8 @@
<broadcast-groups>
<broadcast-group name="bg-group1">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
+ <group-address>${hornetq.broadcast.bg-group1.address:231.7.7.7}</group-address>
+ <group-port>${hornetq.broadcast.bg-group1.port:9876}</group-port>
<broadcast-period>5000</broadcast-period>
<connector-ref>netty</connector-ref>
</broadcast-group>
@@ -88,8 +88,8 @@
<discovery-groups>
<discovery-group name="dg-group1">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
+ <group-address>${hornetq.discovery.dg-group1.address:231.7.7.7}</group-address>
+ <group-port>${hornetq.discovery.dg-group1.port:9876}</group-port>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-configuration.xml 2011-09-07 12:21:20 UTC (rev 11297)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-5/clustered/hornetq-configuration.xml 2011-09-07 12:45:04 UTC (rev 11298)
@@ -79,8 +79,8 @@
<broadcast-groups>
<broadcast-group name="bg-group1">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
+ <group-address>${hornetq.broadcast.bg-group1.address:231.7.7.7}</group-address>
+ <group-port>${hornetq.broadcast.bg-group1.port:9876}</group-port>
<broadcast-period>5000</broadcast-period>
<connector-ref>netty</connector-ref>
</broadcast-group>
@@ -88,8 +88,8 @@
<discovery-groups>
<discovery-group name="dg-group1">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
+ <group-address>${hornetq.discovery.dg-group1.address:231.7.7.7}</group-address>
+ <group-port>${hornetq.discovery.dg-group1.port:9876}</group-port>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-configuration.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2011-09-07 12:21:20 UTC (rev 11297)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-6/clustered/hornetq-configuration.xml 2011-09-07 12:45:04 UTC (rev 11298)
@@ -79,8 +79,8 @@
<broadcast-groups>
<broadcast-group name="bg-group1">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
+ <group-address>${hornetq.broadcast.bg-group1.address:231.7.7.7}</group-address>
+ <group-port>${hornetq.broadcast.bg-group1.port:9876}</group-port>
<broadcast-period>5000</broadcast-period>
<connector-ref>netty</connector-ref>
</broadcast-group>
@@ -88,8 +88,8 @@
<discovery-groups>
<discovery-group name="dg-group1">
- <group-address>231.7.7.7</group-address>
- <group-port>9876</group-port>
+ <group-address>${hornetq.discovery.dg-group1.address:231.7.7.7}</group-address>
+ <group-port>${hornetq.discovery.dg-group1.port:9876}</group-port>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
13 years, 4 months
JBoss hornetq SVN: r11297 - in branches/Branch_2_2_EAP: src/config/jboss-as-4/clustered and 3 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-09-07 08:21:20 -0400 (Wed, 07 Sep 2011)
New Revision: 11297
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java
Modified:
branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/ra.xml
branches/Branch_2_2_EAP/src/config/jboss-as-4/non-clustered/ra.xml
branches/Branch_2_2_EAP/src/config/ra.xml
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
Log:
https://issues.jboss.org/browse/JBPAPP-5791 - fixed and added test to pick up on any changes
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/ra.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/ra.xml 2011-09-07 03:36:06 UTC (rev 11296)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-4/clustered/ra.xml 2011-09-07 12:21:20 UTC (rev 11297)
@@ -76,20 +76,13 @@
<config-property-type>java.lang.String</config-property-type>
<config-property-value>getTm</config-property-value>
</config-property>
- <!--
- <config-property>
+ <!--<config-property>
<description>Does we support HA</description>
<config-property-name>HA</config-property-name>
<config-property-type>java.lang.Boolean</config-property-type>
<config-property-value>false</config-property-value>
</config-property>
<config-property>
- <description>The method to use for locating the transactionmanager</description>
- <config-property-name>TransactionManagerLocatorMethod</config-property-name>
- <config-property-type>java.lang.String</config-property-type>
- <config-property-value>getTm</config-property-value>
- </config-property>
- <config-property>
<description>Use A local Transaction instead of XA?</description>
<config-property-name>UseLocalTx</config-property-name>
<config-property-type>java.lang.Boolean</config-property-type>
@@ -108,6 +101,12 @@
<config-property-value></config-property-value>
</config-property>
<config-property>
+ <description>The jndi params to use to look up the jms resources if local jndi is not to be used</description>
+ <config-property-name>JndiParams</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+ <config-property-value>java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory;java.naming.provider.url=jnp://localhost:1199;java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces</config-property-value>
+ </config-property>
+ <config-property>
<description>The discovery group address</description>
<config-property-name>DiscoveryAddress</config-property-name>
<config-property-type>java.lang.String</config-property-type>
@@ -132,12 +131,18 @@
<config-property-value></config-property-value>
</config-property>
<config-property>
- <description>The load balancing policy class name</description>
- <config-property-name>LoadBalancingPolicyClassName</config-property-name>
- <config-property-type>java.lang.String</config-property-type>
- <config-property-value></config-property-value>
+ <description>The class to use for load balancing connections</description>
+ <config-property-name>ConnectionLoadBalancingPolicyClassName</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+ <config-property-value></config-property-value>
</config-property>
<config-property>
+ <description>number of reconnect attempts for connections after failover occurs</description>
+ <config-property-name>ReconnectAttempts</config-property-name>
+ <config-property-type>java.lang.Integer</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
<description>The client failure check period</description>
<config-property-name>ClientFailureCheckPeriod</config-property-name>
<config-property-type>java.lang.Long</config-property-type>
@@ -222,11 +227,6 @@
<config-property-value></config-property-value>
</config-property>
<config-property>
- <description>The max connections</description>
- <config-property-type>java.lang.Integer</config-property-type>
- <config-property-value></config-property-value>
- </config-property>
- <config-property>
<description>The pre acknowledge</description>
<config-property-name>PreAcknowledge</config-property-name>
<config-property-type>java.lang.Boolean</config-property-type>
@@ -249,6 +249,42 @@
<config-property-name>ClientID</config-property-name>
<config-property-type>java.lang.String</config-property-type>
<config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>use global pools for client</description>
+ <config-property-name>UseGlobalPools</config-property-name>
+ <config-property-type>java.lang.Boolean</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>max number of threads for scheduled threrad pool</description>
+ <config-property-name>ScheduledThreadPoolMaxSize</config-property-name>
+ <config-property-type>java.lang.Integer</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>max number of threads in pool</description>
+ <config-property-name>ThreadPoolMaxSize</config-property-name>
+ <config-property-type>java.lang.Integer</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>whether to use jndi for looking up destinations etc</description>
+ <config-property-name>UseJNDI</config-property-name>
+ <config-property-type>boolean</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>how long in milliseconds to wait before retry on failed MDB setup</description>
+ <config-property-name>SetupInterval</config-property-name>
+ <config-property-type>long</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>How many attempts should be made when connecting the MDB</description>
+ <config-property-name>SetupAttempts</config-property-name>
+ <config-property-type>int</config-property-type>
+ <config-property-value></config-property-value>
</config-property>-->
<outbound-resourceadapter>
Modified: branches/Branch_2_2_EAP/src/config/jboss-as-4/non-clustered/ra.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/jboss-as-4/non-clustered/ra.xml 2011-09-07 03:36:06 UTC (rev 11296)
+++ branches/Branch_2_2_EAP/src/config/jboss-as-4/non-clustered/ra.xml 2011-09-07 12:21:20 UTC (rev 11297)
@@ -76,20 +76,13 @@
<config-property-type>java.lang.String</config-property-type>
<config-property-value>getTm</config-property-value>
</config-property>
- <!--
- <config-property>
+ <!--<config-property>
<description>Does we support HA</description>
<config-property-name>HA</config-property-name>
<config-property-type>java.lang.Boolean</config-property-type>
<config-property-value>false</config-property-value>
</config-property>
<config-property>
- <description>The method to use for locating the transactionmanager</description>
- <config-property-name>TransactionManagerLocatorMethod</config-property-name>
- <config-property-type>java.lang.String</config-property-type>
- <config-property-value>getTm</config-property-value>
- </config-property>
- <config-property>
<description>Use A local Transaction instead of XA?</description>
<config-property-name>UseLocalTx</config-property-name>
<config-property-type>java.lang.Boolean</config-property-type>
@@ -108,6 +101,12 @@
<config-property-value></config-property-value>
</config-property>
<config-property>
+ <description>The jndi params to use to look up the jms resources if local jndi is not to be used</description>
+ <config-property-name>JndiParams</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+ <config-property-value>java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory;java.naming.provider.url=jnp://localhost:1199;java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces</config-property-value>
+ </config-property>
+ <config-property>
<description>The discovery group address</description>
<config-property-name>DiscoveryAddress</config-property-name>
<config-property-type>java.lang.String</config-property-type>
@@ -132,12 +131,18 @@
<config-property-value></config-property-value>
</config-property>
<config-property>
- <description>The load balancing policy class name</description>
- <config-property-name>LoadBalancingPolicyClassName</config-property-name>
- <config-property-type>java.lang.String</config-property-type>
- <config-property-value></config-property-value>
+ <description>The class to use for load balancing connections</description>
+ <config-property-name>ConnectionLoadBalancingPolicyClassName</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+ <config-property-value></config-property-value>
</config-property>
<config-property>
+ <description>number of reconnect attempts for connections after failover occurs</description>
+ <config-property-name>ReconnectAttempts</config-property-name>
+ <config-property-type>java.lang.Integer</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
<description>The client failure check period</description>
<config-property-name>ClientFailureCheckPeriod</config-property-name>
<config-property-type>java.lang.Long</config-property-type>
@@ -222,11 +227,6 @@
<config-property-value></config-property-value>
</config-property>
<config-property>
- <description>The max connections</description>
- <config-property-type>java.lang.Integer</config-property-type>
- <config-property-value></config-property-value>
- </config-property>
- <config-property>
<description>The pre acknowledge</description>
<config-property-name>PreAcknowledge</config-property-name>
<config-property-type>java.lang.Boolean</config-property-type>
@@ -249,6 +249,42 @@
<config-property-name>ClientID</config-property-name>
<config-property-type>java.lang.String</config-property-type>
<config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>use global pools for client</description>
+ <config-property-name>UseGlobalPools</config-property-name>
+ <config-property-type>java.lang.Boolean</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>max number of threads for scheduled threrad pool</description>
+ <config-property-name>ScheduledThreadPoolMaxSize</config-property-name>
+ <config-property-type>java.lang.Integer</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>max number of threads in pool</description>
+ <config-property-name>ThreadPoolMaxSize</config-property-name>
+ <config-property-type>java.lang.Integer</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>whether to use jndi for looking up destinations etc</description>
+ <config-property-name>UseJNDI</config-property-name>
+ <config-property-type>boolean</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>how long in milliseconds to wait before retry on failed MDB setup</description>
+ <config-property-name>SetupInterval</config-property-name>
+ <config-property-type>long</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>How many attempts should be made when connecting the MDB</description>
+ <config-property-name>SetupAttempts</config-property-name>
+ <config-property-type>int</config-property-type>
+ <config-property-value></config-property-value>
</config-property>-->
<outbound-resourceadapter>
Modified: branches/Branch_2_2_EAP/src/config/ra.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/ra.xml 2011-09-07 03:36:06 UTC (rev 11296)
+++ branches/Branch_2_2_EAP/src/config/ra.xml 2011-09-07 12:21:20 UTC (rev 11297)
@@ -51,20 +51,13 @@
<config-property-type>java.lang.String</config-property-type>
<config-property-value>server-id=0</config-property-value>
</config-property>
- <!--
- <config-property>
+ <!--<config-property>
<description>Does we support HA</description>
<config-property-name>HA</config-property-name>
<config-property-type>java.lang.Boolean</config-property-type>
<config-property-value>false</config-property-value>
</config-property>
<config-property>
- <description>The method to use for locating the transactionmanager</description>
- <config-property-name>TransactionManagerLocatorMethod</config-property-name>
- <config-property-type>java.lang.String</config-property-type>
- <config-property-value>getTm</config-property-value>
- </config-property>
- <config-property>
<description>Use A local Transaction instead of XA?</description>
<config-property-name>UseLocalTx</config-property-name>
<config-property-type>java.lang.Boolean</config-property-type>
@@ -113,12 +106,18 @@
<config-property-value></config-property-value>
</config-property>
<config-property>
- <description>The load balancing policy class name</description>
- <config-property-name>LoadBalancingPolicyClassName</config-property-name>
- <config-property-type>java.lang.String</config-property-type>
- <config-property-value></config-property-value>
+ <description>The class to use for load balancing connections</description>
+ <config-property-name>ConnectionLoadBalancingPolicyClassName</config-property-name>
+ <config-property-type>java.lang.String</config-property-type>
+ <config-property-value></config-property-value>
</config-property>
<config-property>
+ <description>number of reconnect attempts for connections after failover occurs</description>
+ <config-property-name>ReconnectAttempts</config-property-name>
+ <config-property-type>java.lang.Integer</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
<description>The client failure check period</description>
<config-property-name>ClientFailureCheckPeriod</config-property-name>
<config-property-type>java.lang.Long</config-property-type>
@@ -203,11 +202,6 @@
<config-property-value></config-property-value>
</config-property>
<config-property>
- <description>The max connections</description>
- <config-property-type>java.lang.Integer</config-property-type>
- <config-property-value></config-property-value>
- </config-property>
- <config-property>
<description>The pre acknowledge</description>
<config-property-name>PreAcknowledge</config-property-name>
<config-property-type>java.lang.Boolean</config-property-type>
@@ -230,6 +224,42 @@
<config-property-name>ClientID</config-property-name>
<config-property-type>java.lang.String</config-property-type>
<config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>use global pools for client</description>
+ <config-property-name>UseGlobalPools</config-property-name>
+ <config-property-type>java.lang.Boolean</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>max number of threads for scheduled threrad pool</description>
+ <config-property-name>ScheduledThreadPoolMaxSize</config-property-name>
+ <config-property-type>java.lang.Integer</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>max number of threads in pool</description>
+ <config-property-name>ThreadPoolMaxSize</config-property-name>
+ <config-property-type>java.lang.Integer</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>whether to use jndi for looking up destinations etc</description>
+ <config-property-name>UseJNDI</config-property-name>
+ <config-property-type>boolean</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>how long in milliseconds to wait before retry on failed MDB setup</description>
+ <config-property-name>SetupInterval</config-property-name>
+ <config-property-type>long</config-property-type>
+ <config-property-value></config-property-value>
+ </config-property>
+ <config-property>
+ <description>How many attempts should be made when connecting the MDB</description>
+ <config-property-name>SetupAttempts</config-property-name>
+ <config-property-type>int</config-property-type>
+ <config-property-value></config-property-value>
</config-property>-->
<outbound-resourceadapter>
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-09-07 03:36:06 UTC (rev 11296)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-09-07 12:21:20 UTC (rev 11297)
@@ -408,36 +408,6 @@
}
/**
- * Get load balancing policy class name
- *
- * @return The value
- */
- public String getLoadBalancingPolicyClassName()
- {
- if (HornetQResourceAdapter.trace)
- {
- HornetQResourceAdapter.log.trace("getLoadBalancingPolicyClassName()");
- }
-
- return raProperties.getConnectionLoadBalancingPolicyClassName();
- }
-
- /**
- * Set load balancing policy class name
- *
- * @param loadBalancingPolicyClassName The value
- */
- public void setLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
- {
- if (HornetQResourceAdapter.trace)
- {
- HornetQResourceAdapter.log.trace("setLoadBalancingPolicyClassName(" + loadBalancingPolicyClassName + ")");
- }
-
- raProperties.setConnectionLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
- }
-
- /**
* Get client failure check period
*
* @return The value
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/ra/HornetQResourceAdapterConfigTest.java 2011-09-07 12:21:20 UTC (rev 11297)
@@ -0,0 +1,343 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.tests.unit.ra;
+
+import org.hornetq.ra.HornetQResourceAdapter;
+import org.hornetq.tests.util.UnitTestCase;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * This test is used to generate the commented out configs in the src/config/ra.xml. If you add a setter to the HornetQResourceAdapter
+ * this test should fail, if it does paste the new commented out configs into the ra.xml file and in here. dont forget to
+ * add a description for each new property added and try and put it in the config some where appropriate.
+ *
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *
+ */
+public class HornetQResourceAdapterConfigTest extends UnitTestCase
+{
+ private static String config = "" +
+ "<config-property>\n"+
+ " <description>\n"+
+ " The transport type. Multiple connectors can be configured by using a comma separated list,\n"+
+ " i.e. org.hornetq.core.remoting.impl.invm.InVMConnectorFactory,org.hornetq.core.remoting.impl.invm.InVMConnectorFactory.\n"+
+ " </description>\n"+
+ " <config-property-name>ConnectorClassName</config-property-name>\n"+
+ " <config-property-type>java.lang.String</config-property-type>\n"+
+ " <config-property-value>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</config-property-value>\n"+
+ " </config-property>\n"+
+ " <config-property>\n"+
+ " <description>The transport configuration. These values must be in the form of key=val;key=val;,\n"+
+ " if multiple connectors are used then each set must be separated by a comma i.e. host=host1;port=5445,host=host2;port=5446.\n"+
+ " Each set of params maps to the connector classname specified.\n"+
+ " </description>\n"+
+ " <config-property-name>ConnectionParameters</config-property-name>\n"+
+ " <config-property-type>java.lang.String</config-property-type>\n"+
+ " <config-property-value>server-id=0</config-property-value>\n"+
+ " </config-property>";
+
+ private static String commentedOutConfigs = "" +
+ " <config-property>\n" +
+ " <description>Does we support HA</description>\n" +
+ " <config-property-name>HA</config-property-name>\n" +
+ " <config-property-type>java.lang.Boolean</config-property-type>\n" +
+ " <config-property-value>false</config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>Use A local Transaction instead of XA?</description>\n" +
+ " <config-property-name>UseLocalTx</config-property-name>\n" +
+ " <config-property-type>java.lang.Boolean</config-property-type>\n" +
+ " <config-property-value>false</config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The user name used to login to the JMS server</description>\n" +
+ " <config-property-name>UserName</config-property-name>\n" +
+ " <config-property-type>java.lang.String</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The password used to login to the JMS server</description>\n" +
+ " <config-property-name>Password</config-property-name>\n" +
+ " <config-property-type>java.lang.String</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The jndi params to use to look up the jms resources if local jndi is not to be used</description>\n" +
+ " <config-property-name>JndiParams</config-property-name>\n" +
+ " <config-property-type>java.lang.String</config-property-type>\n" +
+ " <config-property-value>java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory;java.naming.provider.url=jnp://localhost:1199;java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces</config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The discovery group address</description>\n" +
+ " <config-property-name>DiscoveryAddress</config-property-name>\n" +
+ " <config-property-type>java.lang.String</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The discovery group port</description>\n" +
+ " <config-property-name>DiscoveryPort</config-property-name>\n" +
+ " <config-property-type>java.lang.Integer</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The discovery refresh timeout</description>\n" +
+ " <config-property-name>DiscoveryRefreshTimeout</config-property-name>\n" +
+ " <config-property-type>java.lang.Long</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The discovery initial wait timeout</description>\n" +
+ " <config-property-name>DiscoveryInitialWaitTimeout</config-property-name>\n" +
+ " <config-property-type>java.lang.Long</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property> \n" +
+ " <config-property>\n" +
+ " <description>The class to use for load balancing connections</description>\n" +
+ " <config-property-name>ConnectionLoadBalancingPolicyClassName</config-property-name>\n" +
+ " <config-property-type>java.lang.String</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>number of reconnect attempts for connections after failover occurs</description>\n" +
+ " <config-property-name>ReconnectAttempts</config-property-name>\n" +
+ " <config-property-type>java.lang.Integer</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The client failure check period</description>\n" +
+ " <config-property-name>ClientFailureCheckPeriod</config-property-name>\n" +
+ " <config-property-type>java.lang.Long</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The connection TTL</description>\n" +
+ " <config-property-name>ConnectionTTL</config-property-name>\n" +
+ " <config-property-type>java.lang.Long</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The call timeout</description>\n" +
+ " <config-property-name>CallTimeout</config-property-name>\n" +
+ " <config-property-type>java.lang.Long</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The dups ok batch size</description>\n" +
+ " <config-property-name>DupsOKBatchSize</config-property-name>\n" +
+ " <config-property-type>java.lang.Integer</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The transaction batch size</description>\n" +
+ " <config-property-name>TransactionBatchSize</config-property-name>\n" +
+ " <config-property-type>java.lang.Integer</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The consumer window size</description>\n" +
+ " <config-property-name>ConsumerWindowSize</config-property-name>\n" +
+ " <config-property-type>java.lang.Integer</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The consumer max rate</description>\n" +
+ " <config-property-name>ConsumerMaxRate</config-property-name>\n" +
+ " <config-property-type>java.lang.Integer</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The confirmation window size</description>\n" +
+ " <config-property-name>ConfirmationWindowSize</config-property-name>\n" +
+ " <config-property-type>java.lang.Integer</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The producer max rate</description>\n" +
+ " <config-property-name>ProducerMaxRate</config-property-name>\n" +
+ " <config-property-type>java.lang.Integer</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The min large message size</description>\n" +
+ " <config-property-name>MinLargeMessageSize</config-property-name>\n" +
+ " <config-property-type>java.lang.Integer</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The block on acknowledge</description>\n" +
+ " <config-property-name>BlockOnAcknowledge</config-property-name>\n" +
+ " <config-property-type>java.lang.Boolean</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The block on non durable send</description>\n" +
+ " <config-property-name>BlockOnNonDurableSend</config-property-name>\n" +
+ " <config-property-type>java.lang.Boolean</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The block on durable send</description>\n" +
+ " <config-property-name>BlockOnDurableSend</config-property-name>\n" +
+ " <config-property-type>java.lang.Boolean</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The auto group</description>\n" +
+ " <config-property-name>AutoGroup</config-property-name>\n" +
+ " <config-property-type>java.lang.Boolean</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The pre acknowledge</description>\n" +
+ " <config-property-name>PreAcknowledge</config-property-name>\n" +
+ " <config-property-type>java.lang.Boolean</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The retry interval</description>\n" +
+ " <config-property-name>RetryInterval</config-property-name>\n" +
+ " <config-property-type>java.lang.Long</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The retry interval multiplier</description>\n" +
+ " <config-property-name>RetryIntervalMultiplier</config-property-name>\n" +
+ " <config-property-type>java.lang.Double</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>The client id</description>\n" +
+ " <config-property-name>ClientID</config-property-name>\n" +
+ " <config-property-type>java.lang.String</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>use global pools for client</description>\n" +
+ " <config-property-name>UseGlobalPools</config-property-name>\n" +
+ " <config-property-type>java.lang.Boolean</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>max number of threads for scheduled threrad pool</description>\n" +
+ " <config-property-name>ScheduledThreadPoolMaxSize</config-property-name>\n" +
+ " <config-property-type>java.lang.Integer</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>max number of threads in pool</description>\n" +
+ " <config-property-name>ThreadPoolMaxSize</config-property-name>\n" +
+ " <config-property-type>java.lang.Integer</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>whether to use jndi for looking up destinations etc</description>\n" +
+ " <config-property-name>UseJNDI</config-property-name>\n" +
+ " <config-property-type>boolean</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>how long in milliseconds to wait before retry on failed MDB setup</description>\n" +
+ " <config-property-name>SetupInterval</config-property-name>\n" +
+ " <config-property-type>long</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>\n" +
+ " <config-property>\n" +
+ " <description>How many attempts should be made when connecting the MDB</description>\n" +
+ " <config-property-name>SetupAttempts</config-property-name>\n" +
+ " <config-property-type>int</config-property-type>\n" +
+ " <config-property-value></config-property-value>\n" +
+ " </config-property>";
+
+
+ private static String rootConfig = "<root>" + config + commentedOutConfigs + "</root>";
+
+ public void testConfiguration() throws Exception
+ {
+ Method[] methods = HornetQResourceAdapter.class.getMethods();
+ Map<String,Method> methodList = new HashMap<String, Method>();
+ for (Method method : methods)
+ {
+ if(method.getName().startsWith("set"))
+ {
+ methodList.put(method.getName(), method);
+ }
+ }
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ DocumentBuilder db = dbf.newDocumentBuilder();
+ InputStream io = new ByteArrayInputStream(rootConfig.getBytes());
+ Document dom = db.parse(new InputSource(io));
+
+ Element docEle = dom.getDocumentElement();
+
+ NodeList nl = docEle.getElementsByTagName("config-property");
+
+ for(int i = 0 ; i < nl.getLength();i++)
+ {
+ Element el = (Element)nl.item(i);
+ NodeList elementsByTagName = el.getElementsByTagName("config-property-name");
+ assertEquals(el.toString(), elementsByTagName.getLength(), 1);
+ Node configPropertyNameNode = elementsByTagName.item(0);
+ String configPropertyName = configPropertyNameNode.getTextContent();
+ System.out.println("configPropertyName = " + configPropertyName);
+ Method setter = methodList.remove("set" + configPropertyName);
+ assertNotNull("setter " + configPropertyName + " does not exist", setter);
+ Class c = setter.getParameterTypes()[0];
+ elementsByTagName = el.getElementsByTagName("config-property-type");
+ assertEquals("setter " + configPropertyName + " has no type set", elementsByTagName.getLength(), 1);
+ Node configPropertyTypeNode = elementsByTagName.item(0);
+ String configPropertyTypeName = configPropertyTypeNode.getTextContent();
+ assertEquals(c.getName(), configPropertyTypeName);
+ }
+ if(!methodList.isEmpty())
+ {
+ StringBuffer newConfig = new StringBuffer(commentedOutConfigs);
+ for (Method method : methodList.values())
+ {
+ newConfig.append(" <config-property>\n");
+ newConfig.append(" <description>***add***</description>\n");
+ newConfig.append(" <config-property-name>").append(method.getName().substring(3)).append("</config-property-name>\n");
+ newConfig.append(" <config-property-type>").append(method.getParameterTypes()[0].getName()).append("</config-property-type>\n");
+ newConfig.append(" <config-property-value></config-property-value>\n");
+ newConfig.append(" </config-property>\n");
+ }
+ System.out.println(newConfig);
+ fail("methods not shown please see previous and add");
+ }
+ else
+ {
+ System.out.println(commentedOutConfigs);
+ }
+ }
+}
13 years, 4 months
JBoss hornetq SVN: r11296 - in branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v11 and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-06 23:36:06 -0400 (Tue, 06 Sep 2011)
New Revision: 11296
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
heart beat
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java 2011-09-05 14:18:21 UTC (rev 11295)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java 2011-09-07 03:36:06 UTC (rev 11296)
@@ -26,4 +26,6 @@
void replySent(StompFrame reply);
+ void requestAccepted(StompFrame request);
+
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-05 14:18:21 UTC (rev 11295)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-07 03:36:06 UTC (rev 11296)
@@ -425,6 +425,12 @@
public void handleFrame(StompFrame request)
{
StompFrame reply = null;
+
+ if (stompListener != null)
+ {
+ stompListener.requestAccepted(request);
+ }
+
try
{
if (!initialized)
@@ -446,10 +452,6 @@
if (reply != null)
{
sendFrame(reply);
- if (stompListener != null)
- {
- stompListener.replySent(reply);
- }
}
}
@@ -677,4 +679,22 @@
{
this.stompListener = listener;
}
+
+ //send a ping stomp frame
+ public void ping(StompFrame pingFrame)
+ {
+ manager.sendReply(this, pingFrame);
+ }
+
+ public void physicalSend(StompFrame frame) throws Exception
+ {
+ HornetQBuffer buffer = frame.toHornetQBuffer();
+ getTransportConnection().write(buffer, false, false);
+
+ if (stompListener != null)
+ {
+ stompListener.replySent(frame);
+ }
+
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-05 14:18:21 UTC (rev 11295)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-07 03:36:06 UTC (rev 11296)
@@ -157,8 +157,7 @@
try
{
- HornetQBuffer buffer = frame.toHornetQBuffer();
- connection.getTransportConnection().write(buffer, false, false);
+ connection.physicalSend(frame);
}
catch (Exception e)
{
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-05 14:18:21 UTC (rev 11295)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-07 03:36:06 UTC (rev 11296)
@@ -13,6 +13,7 @@
package org.hornetq.core.protocol.stomp.v11;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Message;
@@ -409,10 +410,19 @@
//kick off the pinger
startHeartBeat();
}
+
if (reply.needsDisconnect())
{
connection.destroy();
}
+ else
+ {
+ //update ping
+ if (heartBeater != null)
+ {
+ heartBeater.pinged();
+ }
+ }
}
private void startHeartBeat()
@@ -423,6 +433,13 @@
}
}
+ public StompFrame createPingFrame()
+ {
+ StompFrame frame = new StompFrame(Stomp.Commands.STOMP);
+ frame.setBody("\n");
+ return frame;
+ }
+
//server heart beat (20,100) (hard coded)
//algorithm:
//(a) server ping: if server hasn't sent any frame within serverPing
@@ -433,10 +450,10 @@
{
long serverPing = 0;
long serverAcceptPing = 0;
- long waitingTime = 0;
volatile boolean shutdown = false;
- volatile long pings = 0;
- volatile long accepts = 0;
+ AtomicLong lastPingTime = new AtomicLong(0);
+ AtomicLong lastAccepted = new AtomicLong(0);
+ StompFrame pingFrame;
public HeartBeater(long clientPing, long clientAcceptPing)
{
@@ -448,21 +465,18 @@
if (clientAcceptPing != 0)
{
serverPing = clientAcceptPing > 20 ? clientAcceptPing : 20;
- if (serverAcceptPing != 0)
- {
- waitingTime = serverPing > serverAcceptPing ? serverAcceptPing : serverPing;
- }
- else
- {
- waitingTime = serverPing;
- }
}
}
+ public void pinged()
+ {
+ lastPingTime.set(System.currentTimeMillis());
+ }
+
public void run()
{
- long lastPing = 0;
- long lastAccepted = System.currentTimeMillis();
+ lastAccepted.set(System.currentTimeMillis());
+ pingFrame = createPingFrame();
synchronized (this)
{
@@ -473,44 +487,28 @@
if (serverPing != 0)
{
- if (pings == 0)
+ dur1 = System.currentTimeMillis() - lastPingTime.get();
+ if (dur1 >= serverPing)
{
- dur1 = System.currentTimeMillis() - lastPing;
- if (dur1 >= serverPing)
- {
- lastPing = System.currentTimeMillis();
- connection.ping();
- dur1 = 0;
- }
+ lastPingTime.set(System.currentTimeMillis());
+ connection.ping(pingFrame);
+ dur1 = 0;
}
- else
- {
- dur1 = 5;
- pings = 0;
- }
}
if (serverAcceptPing != 0)
{
- if (accepts == 0)
+ dur2 = System.currentTimeMillis() - lastAccepted.get();
+ if (dur2 > (2 * serverAcceptPing))
{
- dur2 = System.currentTimeMillis() - lastAccepted;
- if (dur2 > (2 * serverAcceptPing))
- {
- connection.setValid(false);
- shutdown = true;
- break;
- }
+ connection.disconnect();
+ shutdown = true;
+ break;
}
- else
- {
- lastAccepted = System.currentTimeMillis();
- accepts = 0;
- }
}
long waitTime1 = serverPing - dur1;
- long waitTime2 = serverAcceptPing*2 - dur2;
+ long waitTime2 = serverAcceptPing * 2 - dur2;
long waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
@@ -524,6 +522,20 @@
}
}
}
+
+ public void pingAccepted()
+ {
+ this.lastAccepted.set(System.currentTimeMillis());
+ }
}
+ @Override
+ public void requestAccepted(StompFrame request)
+ {
+ if (heartBeater != null)
+ {
+ heartBeater.pingAccepted();
+ }
+ }
+
}
13 years, 4 months
JBoss hornetq SVN: r11295 - in branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v11 and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-05 10:18:21 -0400 (Mon, 05 Sep 2011)
New Revision: 11295
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
more coding
Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public interface FrameEventListener
+{
+
+ void replySent(StompFrame reply);
+
+}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -15,6 +15,10 @@
import java.util.ArrayList;
import java.util.List;
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
public class HornetQStompException extends Exception {
private static final long serialVersionUID = -274452327574950068L;
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java 2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -168,6 +168,8 @@
//1.1
String ACCEPT_VERSION = "accept-version";
String HOST = "host";
+
+ Object HEART_BEAT = "heart-beat";
}
public interface Error
@@ -189,6 +191,8 @@
String VERSION = "version";
String SERVER = "server";
+
+ String HEART_BEAT = "heart-beat";
}
public interface Ack
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -76,6 +76,8 @@
private VersionedStompFrameHandler frameHandler;
private boolean initialized;
+
+ private FrameEventListener stompListener;
public StompDecoder getDecoder()
{
@@ -444,6 +446,10 @@
if (reply != null)
{
sendFrame(reply);
+ if (stompListener != null)
+ {
+ stompListener.replySent(reply);
+ }
}
}
@@ -662,8 +668,13 @@
}
public StompFrame createStompMessage(ServerMessage serverMessage,
- StompSubscription subscription, int deliveryCount)
+ StompSubscription subscription, int deliveryCount) throws Exception
{
return frameHandler.createMessageFrame(serverMessage, subscription, deliveryCount);
}
+
+ public void addStompEventListener(FrameEventListener listener)
+ {
+ this.stompListener = listener;
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -56,10 +56,18 @@
private int size;
+ private boolean disconnect;
+
public StompFrame(String command)
{
+ this(command, false);
+ }
+
+ public StompFrame(String command, boolean disconnect)
+ {
this.command = command;
this.headers = new LinkedHashMap<String, String>();
+ this.disconnect = disconnect;
}
public String getCommand()
@@ -179,4 +187,9 @@
}
return new byte[0];
}
+
+ public boolean needsDisconnect()
+ {
+ return disconnect;
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -105,7 +105,7 @@
}
catch (Exception e)
{
- e.printStackTrace();
+ log.error("Error delivering stomp messages", e);
return 0;
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -12,7 +12,6 @@
*/
package org.hornetq.core.protocol.stomp.v11;
-import java.io.UnsupportedEncodingException;
import java.util.Map;
import org.hornetq.api.core.HornetQBuffer;
@@ -20,6 +19,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.FrameEventListener;
import org.hornetq.core.protocol.stomp.HornetQStompException;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompConnection;
@@ -27,20 +27,24 @@
import org.hornetq.core.protocol.stomp.StompSubscription;
import org.hornetq.core.protocol.stomp.StompUtils;
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
-import org.hornetq.core.protocol.stomp.Stomp.Headers;
-import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.utils.DataConstants;
-public class StompFrameHandlerV11 extends VersionedStompFrameHandler
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements FrameEventListener
{
private static final Logger log = Logger.getLogger(StompFrameHandlerV11.class);
+
+ private HeartBeater heartBeater;
public StompFrameHandlerV11(StompConnection connection)
{
this.connection = connection;
+ connection.addStompEventListener(this);
}
@Override
@@ -53,43 +57,81 @@
String clientID = headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = headers.get(Stomp.Headers.Connect.REQUEST_ID);
- if (connection.validateUser(login, passcode))
+ try
{
- connection.setClientID(clientID);
- connection.setValid(true);
-
- response = new StompFrame(Stomp.Responses.CONNECTED);
-
- //version
- response.addHeader(Stomp.Headers.Connected.VERSION, connection.getVersion());
-
- //session
- response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
-
- //server
- response.addHeader(Stomp.Headers.Connected.SERVER, connection.getHornetQServerName());
-
- if (requestID != null)
+ if (connection.validateUser(login, passcode))
{
- response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ connection.setClientID(clientID);
+ connection.setValid(true);
+
+ response = new StompFrame(Stomp.Responses.CONNECTED);
+
+ // version
+ response.addHeader(Stomp.Headers.Connected.VERSION,
+ connection.getVersion());
+
+ // session
+ response.addHeader(Stomp.Headers.Connected.SESSION, connection
+ .getID().toString());
+
+ // server
+ response.addHeader(Stomp.Headers.Connected.SERVER,
+ connection.getHornetQServerName());
+
+ if (requestID != null)
+ {
+ response.addHeader(Stomp.Headers.Connected.RESPONSE_ID,
+ requestID);
+ }
+
+ // heart-beat. We need to start after connected frame has been sent.
+ // otherwise the client may receive heart-beat before it receives
+ // connected frame.
+ String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
+
+ if (heartBeat != null)
+ {
+ handleHeartBeat(heartBeat);
+ response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "20,100");
+ }
}
+ else
+ {
+ // not valid
+ response = new StompFrame(Stomp.Responses.ERROR, true);
+ response.addHeader(Stomp.Headers.Error.VERSION, "1.0,1.1");
+
+ response.setBody("Supported protocol versions are 1.0 and 1.1");
+ }
}
- else
+ catch (HornetQStompException e)
{
- //not valid
- response = new StompFrame(Stomp.Responses.ERROR);
- response.addHeader(Stomp.Headers.Error.VERSION, "1.0,1.1");
-
- response.setBody("Supported protocol versions are 1.0 and 1.1");
-
- connection.sendFrame(response);
- connection.destroy();
-
- return null;
+ response = e.getFrame();
}
return response;
}
+ //ping parameters, hard-code for now
+ //the server can support min 20 milliseconds and receive ping at 100 milliseconds (20,100)
+ private void handleHeartBeat(String heartBeatHeader) throws HornetQStompException
+ {
+ String[] params = heartBeatHeader.split(",");
+ if (params.length != 2)
+ {
+ throw new HornetQStompException("Incorrect heartbeat header " + heartBeatHeader);
+ }
+
+ //client ping
+ long minPingInterval = Long.valueOf(params[0]);
+ //client receive ping
+ long minAcceptInterval = Long.valueOf(params[1]);
+
+ if ((minPingInterval != 0) || (minAcceptInterval != 0))
+ {
+ heartBeater = new HeartBeater(minPingInterval, minAcceptInterval);
+ }
+ }
+
@Override
public StompFrame onDisconnect(StompFrame frame)
{
@@ -359,4 +401,129 @@
}
+ @Override
+ public void replySent(StompFrame reply)
+ {
+ if (reply.getCommand().equals(Stomp.Responses.CONNECTED))
+ {
+ //kick off the pinger
+ startHeartBeat();
+ }
+ if (reply.needsDisconnect())
+ {
+ connection.destroy();
+ }
+ }
+
+ private void startHeartBeat()
+ {
+ if (heartBeater != null)
+ {
+ heartBeater.start();
+ }
+ }
+
+ //server heart beat (20,100) (hard coded)
+ //algorithm:
+ //(a) server ping: if server hasn't sent any frame within serverPing
+ //interval, send a ping.
+ //(b) accept ping: if server hasn't received any frame within
+ // 2*serverAcceptPing, disconnect!
+ private class HeartBeater extends Thread
+ {
+ long serverPing = 0;
+ long serverAcceptPing = 0;
+ long waitingTime = 0;
+ volatile boolean shutdown = false;
+ volatile long pings = 0;
+ volatile long accepts = 0;
+
+ public HeartBeater(long clientPing, long clientAcceptPing)
+ {
+ if (clientPing != 0)
+ {
+ serverAcceptPing = clientPing > 100 ? clientPing : 100;
+ }
+
+ if (clientAcceptPing != 0)
+ {
+ serverPing = clientAcceptPing > 20 ? clientAcceptPing : 20;
+ if (serverAcceptPing != 0)
+ {
+ waitingTime = serverPing > serverAcceptPing ? serverAcceptPing : serverPing;
+ }
+ else
+ {
+ waitingTime = serverPing;
+ }
+ }
+ }
+
+ public void run()
+ {
+ long lastPing = 0;
+ long lastAccepted = System.currentTimeMillis();
+
+ synchronized (this)
+ {
+ while (!shutdown)
+ {
+ long dur1 = 0;
+ long dur2 = 0;
+
+ if (serverPing != 0)
+ {
+ if (pings == 0)
+ {
+ dur1 = System.currentTimeMillis() - lastPing;
+ if (dur1 >= serverPing)
+ {
+ lastPing = System.currentTimeMillis();
+ connection.ping();
+ dur1 = 0;
+ }
+ }
+ else
+ {
+ dur1 = 5;
+ pings = 0;
+ }
+ }
+
+ if (serverAcceptPing != 0)
+ {
+ if (accepts == 0)
+ {
+ dur2 = System.currentTimeMillis() - lastAccepted;
+ if (dur2 > (2 * serverAcceptPing))
+ {
+ connection.setValid(false);
+ shutdown = true;
+ break;
+ }
+ }
+ else
+ {
+ lastAccepted = System.currentTimeMillis();
+ accepts = 0;
+ }
+ }
+
+ long waitTime1 = serverPing - dur1;
+ long waitTime2 = serverAcceptPing*2 - dur2;
+
+ long waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
+
+ try
+ {
+ this.wait(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ }
+ }
+
}
13 years, 4 months
JBoss hornetq SVN: r11294 - in branches/STOMP11/tests/stomp-tests: v10/src/test/java/org and 10 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-04 20:37:40 -0400 (Sun, 04 Sep 2011)
New Revision: 11294
Added:
branches/STOMP11/tests/stomp-tests/v10/src/test/java/org/
branches/STOMP11/tests/stomp-tests/v10/src/test/java/org/hornetq/
branches/STOMP11/tests/stomp-tests/v10/src/test/java/org/hornetq/stomp/
branches/STOMP11/tests/stomp-tests/v10/src/test/java/org/hornetq/stomp/tests/
branches/STOMP11/tests/stomp-tests/v10/src/test/java/org/hornetq/stomp/tests/v10/
branches/STOMP11/tests/stomp-tests/v10/src/test/java/org/hornetq/stomp/tests/v10/StompConnectionTest.java
branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/
branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/
branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/
branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/
branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/HornetQStompTestCase.java
branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/HornetQTestServer.java
branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/HornetQTestServerManager.java
branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/v11/
branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/v11/StompConnectionTest.java
Log:
test
Added: branches/STOMP11/tests/stomp-tests/v10/src/test/java/org/hornetq/stomp/tests/v10/StompConnectionTest.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/v10/src/test/java/org/hornetq/stomp/tests/v10/StompConnectionTest.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/v10/src/test/java/org/hornetq/stomp/tests/v10/StompConnectionTest.java 2011-09-05 00:37:40 UTC (rev 11294)
@@ -0,0 +1,6 @@
+package org.hornetq.stomp.tests.v10;
+
+public class StompConnectionTest
+{
+}
+
Added: branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/HornetQStompTestCase.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/HornetQStompTestCase.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/HornetQStompTestCase.java 2011-09-05 00:37:40 UTC (rev 11294)
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests;
+
+import junit.framework.TestCase;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class HornetQStompTestCase extends TestCase
+{
+ protected HornetQTestServerManager serverManager = new HornetQTestServerManager();
+ protected int numberOfServers = 1;
+
+ protected void setUp()
+ {
+ for (int i = 0; i < numberOfServers; i++)
+ {
+ serverManager.createServer(i);
+ }
+ }
+
+ protected void tearDown()
+ {
+ for (int i = 0; i < numberOfServers; i++)
+ {
+ serverManager.shutdownServer(i);
+ }
+ }
+}
Added: branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/HornetQTestServer.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/HornetQTestServer.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/HornetQTestServer.java 2011-09-05 00:37:40 UTC (rev 11294)
@@ -0,0 +1,69 @@
+package org.hornetq.stomp.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class HornetQTestServer
+{
+
+ public HornetQTestServer(int serverIndex)
+ {
+
+ }
+
+ public void start() throws IOException
+ {
+ String[] cmdArray = assembleCommandArray();
+ String[] envp = assembleEnvp();
+ File dir = getWorkingDir();
+
+ Process p = Runtime.getRuntime().exec(cmdArray, envp, dir);
+ }
+
+ private String[] assembleCommandArray()
+ {
+ //java
+ String javaHome = System.getProperty("java.home");
+ String javaPath = javaHome + File.separator + "bin" + File.separator + "java";
+
+ List<String> fullCommand = new ArrayList<String>();
+ fullCommand.add("\"" + javaPath + "\"");
+
+ //classpath
+ fullCommand.add("-cp");
+ //hornetq jars
+ fullCommand.addAll(getHornetQServerRuntimeJars());
+ fullCommand.addAll(getTestClassBuildDirs());
+
+
+ return null;
+ }
+
+ private Collection<? extends String> getHornetQServerRuntimeJars()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ private String[] assembleEnvp()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ private File getWorkingDir()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void shutdown()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/HornetQTestServerManager.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/HornetQTestServerManager.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/HornetQTestServerManager.java 2011-09-05 00:37:40 UTC (rev 11294)
@@ -0,0 +1,27 @@
+package org.hornetq.stomp.tests;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HornetQTestServerManager
+{
+
+ List<HornetQTestServer> serverList = new ArrayList<HornetQTestServer>();
+ Map<Integer, HornetQTestServer> servers = new LinkedHashMap<Integer, HornetQTestServer>();
+
+ public void createServer(int serverId)
+ {
+ HornetQTestServer server = new HornetQTestServer(serverId);
+ server.start();
+ servers.put(serverId, server);
+ }
+
+ public void shutdownServer(int serverIndex)
+ {
+ HornetQTestServer server = servers.remove(serverIndex);
+ server.shutdown();
+ }
+
+}
Added: branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/v11/StompConnectionTest.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/v11/StompConnectionTest.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/v11/src/test/java/org/hornetq/stomp/tests/v11/StompConnectionTest.java 2011-09-05 00:37:40 UTC (rev 11294)
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.v11;
+
+import org.hornetq.stomp.tests.HornetQStompTestCase;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class StompConnectionTest extends HornetQStompTestCase
+{
+}
+
13 years, 4 months
JBoss hornetq SVN: r11293 - in branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v10 and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-04 20:34:29 -0400 (Sun, 04 Sep 2011)
New Revision: 11293
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
Log:
curr work
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-05 00:34:29 UTC (rev 11293)
@@ -1,3 +1,15 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
package org.hornetq.core.protocol.stomp;
import java.util.ArrayList;
@@ -35,6 +47,11 @@
this.body = body;
}
+ public StompFrame getFrame()
+ {
+ return null;
+ }
+
private class Header
{
public String key;
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java 2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java 2011-09-05 00:34:29 UTC (rev 11293)
@@ -40,12 +40,6 @@
String UNSUBSCRIBE = "UNSUBSCRIBE";
- String BEGIN_TRANSACTION = "BEGIN";
-
- String COMMIT_TRANSACTION = "COMMIT";
-
- String ABORT_TRANSACTION = "ABORT";
-
String BEGIN = "BEGIN";
String COMMIT = "COMMIT";
@@ -53,6 +47,11 @@
String ABORT = "ABORT";
String ACK = "ACK";
+
+ //1.1
+ String NACK = "NACK";
+
+ String STOMP = "STOMP";
}
public interface Responses
@@ -76,6 +75,10 @@
String CONTENT_LENGTH = "content-length";
+ String ACCEPT_VERSION = "accept-version";
+
+ String CONTENT_TYPE = "content-type";
+
public interface Response
{
String RECEIPT_ID = "receipt-id";
@@ -140,6 +143,8 @@
String AUTO = "auto";
String CLIENT = "client";
+
+ String CLIENT_INDIVIDUAL = "client-individual";
}
}
@@ -167,7 +172,11 @@
public interface Error
{
+ //1.0 only
String MESSAGE = "message";
+
+ //1.1
+ String VERSION = "version";
}
public interface Connected
@@ -175,11 +184,19 @@
String SESSION = "session";
String RESPONSE_ID = "response-id";
+
+ //1.1
+ String VERSION = "version";
+
+ String SERVER = "server";
}
public interface Ack
{
String MESSAGE_ID = "message-id";
+
+ //1.1
+ String SUBSCRIPTION = "subscription";
}
}
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-05 00:34:29 UTC (rev 11293)
@@ -27,6 +27,8 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.remoting.Connection;
@@ -39,8 +41,9 @@
*/
public class StompConnection implements RemotingConnection
{
-
private static final Logger log = Logger.getLogger(StompConnection.class);
+
+ protected static final String CONNECTION_ID_PROP = "__HQ_CID";
private final StompProtocolManager manager;
@@ -68,7 +71,11 @@
private volatile boolean dataReceived;
- private StompVersions version = StompVersions.V1_0;
+ private StompVersions version;
+
+ private VersionedStompFrameHandler frameHandler;
+
+ private boolean initialized;
public StompDecoder getDecoder()
{
@@ -199,10 +206,6 @@
manager.cleanup(this);
}
- public void disconnect()
- {
- }
-
public void fail(final HornetQException me)
{
synchronized (failLock)
@@ -358,8 +361,10 @@
* accept-version value takes form of "v1,v2,v3..."
* we need to return the highest supported version
*/
- public void negotiateVersion(String acceptVersion) throws HornetQStompException
+ public void negotiateVersion(StompFrame frame) throws HornetQStompException
{
+ String acceptVersion = frame.getHeader(Stomp.Headers.ACCEPT_VERSION);
+
if (acceptVersion == null)
{
this.version = StompVersions.V1_0;
@@ -391,6 +396,9 @@
throw error;
}
}
+
+ this.frameHandler = VersionedStompFrameHandler.getHandler(this, this.version);
+ this.initialized = true;
}
//reject if the host doesn't match
@@ -411,4 +419,251 @@
throw error;
}
}
+
+ public void handleFrame(StompFrame request)
+ {
+ StompFrame reply = null;
+ try
+ {
+ if (!initialized)
+ {
+ if (!Stomp.Commands.CONNECT.equals(request.getCommand()))
+ {
+ throw new HornetQStompException("Connection hasn't been established.");
+ }
+ //decide version
+ negotiateVersion(request);
+ }
+ reply = frameHandler.handleFrame(request);
+ }
+ catch (HornetQStompException e)
+ {
+ reply = e.getFrame();
+ }
+
+ if (reply != null)
+ {
+ sendFrame(reply);
+ }
+ }
+
+ public void sendFrame(StompFrame frame)
+ {
+ manager.sendReply(this, frame);
+ }
+
+ public boolean validateUser(String login, String passcode)
+ {
+ this.valid = manager.validateUser(login, passcode);
+ if (valid)
+ {
+ this.login = login;
+ this.passcode = passcode;
+ }
+ return valid;
+ }
+
+ public ServerMessageImpl createServerMessage()
+ {
+ return manager.createServerMessage();
+ }
+
+ public StompSession getSession(String txID) throws HornetQStompException
+ {
+ StompSession session = null;
+ try
+ {
+ if (txID == null)
+ {
+ session = manager.getSession(this);
+ }
+ else
+ {
+ session = manager.getTransactedSession(this, txID);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Exception getting session", e);
+ }
+
+ return session;
+ }
+
+ public void validate() throws HornetQStompException
+ {
+ if (!this.valid)
+ {
+ throw new HornetQStompException("Connection is not valid.");
+ }
+ }
+
+ public void sendServerMessage(ServerMessageImpl message, String txID) throws HornetQStompException
+ {
+ StompSession stompSession = getSession(txID);
+
+ if (stompSession.isNoLocal())
+ {
+ message.putStringProperty(CONNECTION_ID_PROP, getID().toString());
+ }
+ try
+ {
+ stompSession.getSession().send(message, true);
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error sending message " + message, e);
+ }
+ }
+
+ @Override
+ public void disconnect()
+ {
+ destroy();
+ }
+
+ public void beginTransaction(String txID) throws HornetQStompException
+ {
+ try
+ {
+ manager.beginTransaction(this, txID);
+ }
+ catch (HornetQStompException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error beginning a transaction: " + txID, e);
+ }
+ }
+
+ public void commitTransaction(String txID) throws HornetQStompException
+ {
+ try
+ {
+ manager.commitTransaction(this, txID);
+ }
+ catch (HornetQStompException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error committing " + txID, e);
+ }
+ }
+
+ public void abortTransaction(String txID) throws HornetQStompException
+ {
+ try
+ {
+ manager.abortTransaction(this, txID);
+ }
+ catch (HornetQStompException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error aborting " + txID, e);
+ }
+ }
+
+ public void subscribe(String destination, String selector, String ack,
+ String id, String durableSubscriptionName, boolean noLocal) throws HornetQStompException
+ {
+ if (noLocal)
+ {
+ String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'";
+ if (selector == null)
+ {
+ selector = noLocalFilter;
+ }
+ else
+ {
+ selector += " AND " + noLocalFilter;
+ }
+ }
+ if (ack == null)
+ {
+ ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
+ }
+
+ String subscriptionID = null;
+ if (id != null)
+ {
+ subscriptionID = id;
+ }
+ else
+ {
+ if (destination == null)
+ {
+ throw new HornetQStompException("Client must set destination or id header to a SUBSCRIBE command");
+ }
+ subscriptionID = "subscription/" + destination;
+ }
+
+ try
+ {
+ manager.createSubscription(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal);
+ }
+ catch (HornetQStompException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error creating subscription " + subscriptionID, e);
+ }
+ }
+
+ public void unsubscribe(String subscriptionID) throws HornetQStompException
+ {
+ try
+ {
+ manager.unsubscribe(this, subscriptionID);
+ }
+ catch (HornetQStompException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error unsubscripting " + subscriptionID, e);
+ }
+ }
+
+ public void acknowledge(String messageID, String subscriptionID) throws HornetQStompException
+ {
+ try
+ {
+ manager.acknowledge(this, messageID, subscriptionID);
+ }
+ catch (HornetQStompException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error acknowledging message " + messageID, e);
+ }
+ }
+
+ public String getVersion()
+ {
+ return String.valueOf(version);
+ }
+
+ public String getHornetQServerName()
+ {
+ //hard coded, review later.
+ return "HornetQ/2.2.5 HornetQ Messaging Engine";
+ }
+
+ public StompFrame createStompMessage(ServerMessage serverMessage,
+ StompSubscription subscription, int deliveryCount)
+ {
+ return frameHandler.createMessageFrame(serverMessage, subscription, deliveryCount);
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-05 00:34:29 UTC (rev 11293)
@@ -17,6 +17,10 @@
*/
package org.hornetq.core.protocol.stomp;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -31,7 +35,7 @@
* @author Tim Fox
*
*/
-class StompFrame
+public class StompFrame
{
private static final Logger log = Logger.getLogger(StompFrame.class);
@@ -39,45 +43,30 @@
private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
- private final String command;
+ private String command;
- private final Map<String, Object> headers;
+ private Map<String, String> headers;
+
+ //stomp 1.1 talks about repetitive headers.
+ private List<Header> allHeaders = new ArrayList<Header>();
- private final byte[] content;
+ private String body;
private HornetQBuffer buffer = null;
private int size;
- public StompFrame(String command, Map<String, Object> headers, byte[] data)
+ public StompFrame(String command)
{
this.command = command;
- this.headers = headers;
- this.content = data;
+ this.headers = new LinkedHashMap<String, String>();
}
- public StompFrame(String command, Map<String, Object> headers)
- {
- this.command = command;
- this.headers = headers;
- this.content = NO_DATA;
- }
-
public String getCommand()
{
return command;
}
- public byte[] getContent()
- {
- return content;
- }
-
- public Map<String, Object> getHeaders()
- {
- return headers;
- }
-
public int getEncodedSize() throws Exception
{
if (buffer == null)
@@ -90,18 +79,18 @@
@Override
public String toString()
{
- return "StompFrame[command=" + command + ", headers=" + headers + ", content-length=" + content.length + "]";
+ return "StompFrame[command=" + command + ", headers=" + headers + ", content-length=";
}
public String asString()
{
String out = command + '\n';
- for (Entry<String, Object> header : headers.entrySet())
+ for (Entry<String, String> header : headers.entrySet())
{
out += header.getKey() + ": " + header.getValue() + '\n';
}
out += '\n';
- out += new String(content);
+ out += body;
return out;
}
@@ -116,7 +105,7 @@
head.append(command);
head.append(Stomp.NEWLINE);
// Output the headers.
- for (Map.Entry<String, Object> header : headers.entrySet())
+ for (Map.Entry<String, String> header : headers.entrySet())
{
head.append(header.getKey());
head.append(Stomp.Headers.SEPARATOR);
@@ -134,4 +123,60 @@
}
return buffer;
}
+
+ public String getHeader(String key)
+ {
+ return headers.get(key);
+ }
+
+ public void addHeader(String key, String val)
+ {
+ if (!headers.containsKey(key))
+ {
+ headers.put(key, val);
+ }
+ allHeaders.add(new Header(key, val));
+ }
+
+ public Map<String, String> getHeadersMap()
+ {
+ return headers;
+ }
+
+ private class Header
+ {
+ public String key;
+ public String val;
+
+ public Header(String key, String val)
+ {
+ this.key = key;
+ this.val = val;
+ }
+ }
+
+ public void setBody(String body)
+ {
+ this.body = body;
+ }
+
+ public boolean hasHeader(String key)
+ {
+ return headers.containsKey(key);
+ }
+
+ public String getBody()
+ {
+ return body;
+ }
+
+ //Since 1.1, there is a content-type header that needs to take care of
+ public byte[] getBodyAsBytes() throws UnsupportedEncodingException
+ {
+ if (body != null)
+ {
+ return body.getBytes("UTF-8");
+ }
+ return new byte[0];
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-05 00:34:29 UTC (rev 11293)
@@ -13,10 +13,6 @@
package org.hornetq.core.protocol.stomp;
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -27,8 +23,6 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
@@ -53,9 +47,6 @@
private static final Logger log = Logger.getLogger(StompProtocolManager.class);
- // TODO use same value than HornetQConnection
- private static final String CONNECTION_ID_PROP = "__HQ_CID";
-
// Attributes ----------------------------------------------------
private final HornetQServer server;
@@ -69,36 +60,6 @@
// Static --------------------------------------------------------
- private static StompFrame createError(Exception e, StompFrame request)
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try
- {
- // Let the stomp client know about any protocol errors.
- PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
- e.printStackTrace(stream);
- stream.close();
-
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
-
- final String receiptId = (String)request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
- if (receiptId != null)
- {
- headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
- }
-
- byte[] payload = baos.toByteArray();
- headers.put(Stomp.Headers.CONTENT_LENGTH, payload.length);
- return new StompFrame(Stomp.Responses.ERROR, headers, payload);
- }
- catch (UnsupportedEncodingException ex)
- {
- log.warn("Unable to create ERROR frame from the exception", ex);
- return null;
- }
- }
-
// Constructors --------------------------------------------------
public StompProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
@@ -143,19 +104,15 @@
public void handleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
{
- long start = System.nanoTime();
StompConnection conn = (StompConnection)connection;
conn.setDataReceived();
StompDecoder decoder = conn.getDecoder();
-
- // log.info("in handle");
do
{
StompFrame request;
-
try
{
request = decoder.decode(buffer);
@@ -163,7 +120,6 @@
catch (Exception e)
{
log.error("Failed to decode", e);
-
return;
}
@@ -174,93 +130,13 @@
try
{
- String command = request.getCommand();
-
- StompFrame response = null;
-
- if (Stomp.Commands.CONNECT.equals(command))
- {
- response = onConnect(request, conn);
- }
- else if (Stomp.Commands.DISCONNECT.equals(command))
- {
- response = onDisconnect(request, conn);
- }
- else if (Stomp.Commands.SEND.equals(command))
- {
- response = onSend(request, conn);
- }
- else if (Stomp.Commands.SUBSCRIBE.equals(command))
- {
- response = onSubscribe(request, conn);
- }
- else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
- {
- response = onUnsubscribe(request, conn);
- }
- else if (Stomp.Commands.ACK.equals(command))
- {
- response = onAck(request, conn);
- }
- else if (Stomp.Commands.BEGIN.equals(command))
- {
- response = onBegin(request, server, conn);
- }
- else if (Stomp.Commands.COMMIT.equals(command))
- {
- response = onCommit(request, conn);
- }
- else if (Stomp.Commands.ABORT.equals(command))
- {
- response = onAbort(request, conn);
- }
- else
- {
- log.error("Unsupported Stomp frame: " + request);
- response = new StompFrame(Stomp.Responses.ERROR,
- new HashMap<String, Object>(),
- ("Unsupported frame: " + command).getBytes());
- }
-
- if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
- {
- if (response == null)
- {
- Map<String, Object> h = new HashMap<String, Object>();
- response = new StompFrame(Stomp.Responses.RECEIPT, h);
- }
- response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
- request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
- }
-
- if (response != null)
- {
- sendReply(conn, response);
- }
-
- if (Stomp.Commands.DISCONNECT.equals(command))
- {
- conn.destroy();
- }
+ conn.handleFrame(request);
}
- catch (Exception e)
- {
- e.printStackTrace();
- StompFrame error = createError(e, request);
- if (error != null)
- {
- sendReply(conn, error);
- }
- }
finally
{
server.getStorageManager().clearContext();
}
} while (decoder.hasBytes());
-
- long end = System.nanoTime();
-
- // log.info("handle took " + (end-start));
}
// Public --------------------------------------------------------
@@ -297,182 +173,8 @@
// Private -------------------------------------------------------
- private StompFrame onSubscribe(StompFrame frame, StompConnection connection) throws Exception
+ public StompSession getSession(StompConnection connection) throws Exception
{
- Map<String, Object> headers = frame.getHeaders();
- String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
- String selector = (String)headers.get(Stomp.Headers.Subscribe.SELECTOR);
- String ack = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
- String id = (String)headers.get(Stomp.Headers.Subscribe.ID);
- String durableSubscriptionName = (String)headers.get(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
- boolean noLocal = false;
- if (headers.containsKey(Stomp.Headers.Subscribe.NO_LOCAL))
- {
- noLocal = Boolean.parseBoolean((String)headers.get(Stomp.Headers.Subscribe.NO_LOCAL));
- }
- if (noLocal)
- {
- String noLocalFilter = CONNECTION_ID_PROP + " <> '" + connection.getID().toString() + "'";
- if (selector == null)
- {
- selector = noLocalFilter;
- }
- else
- {
- selector += " AND " + noLocalFilter;
- }
- }
- if (ack == null)
- {
- ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
- }
- String subscriptionID = null;
- if (id != null)
- {
- subscriptionID = id;
- }
- else
- {
- if (destination == null)
- {
- throw new StompException("Client must set destination or id header to a SUBSCRIBE command");
- }
- subscriptionID = "subscription/" + destination;
- }
- StompSession stompSession = getSession(connection);
- stompSession.setNoLocal(noLocal);
- if (stompSession.containsSubscription(subscriptionID))
- {
- throw new StompException("There already is a subscription for: " + subscriptionID +
- ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
- }
- long consumerID = server.getStorageManager().generateUniqueID();
- String clientID = (connection.getClientID() != null) ? connection.getClientID() : null;
- stompSession.addSubscription(consumerID,
- subscriptionID,
- clientID,
- durableSubscriptionName,
- destination,
- selector,
- ack);
-
- return null;
- }
-
- private StompFrame onUnsubscribe(StompFrame frame, StompConnection connection) throws Exception
- {
- Map<String, Object> headers = frame.getHeaders();
- String destination = (String)headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
- String id = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
-
- String subscriptionID = null;
- if (id != null)
- {
- subscriptionID = id;
- }
- else
- {
- if (destination == null)
- {
- throw new StompException("Must specify the subscription's id or the destination you are unsubscribing from");
- }
- subscriptionID = "subscription/" + destination;
- }
-
- StompSession stompSession = getSession(connection);
- boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
- if (!unsubscribed)
- {
- throw new StompException("Cannot unsubscribe as no subscription exists for id: " + subscriptionID);
- }
- return null;
- }
-
- private StompFrame onAck(StompFrame frame, StompConnection connection) throws Exception
- {
- Map<String, Object> headers = frame.getHeaders();
- String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
- String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
- StompSession stompSession = null;
- if (txID != null)
- {
- log.warn("Transactional acknowledgement is not supported");
- }
- stompSession = getSession(connection);
- stompSession.acknowledge(messageID);
-
- return null;
- }
-
- private StompFrame onBegin(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
- {
- Map<String, Object> headers = frame.getHeaders();
- String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- throw new StompException("transaction header is mandatory to BEGIN a transaction");
- }
- if (transactedSessions.containsKey(txID))
- {
- throw new StompException("Transaction already started: " + txID);
- }
- // create the transacted session
- getTransactedSession(connection, txID);
-
- return null;
- }
-
- private StompFrame onCommit(StompFrame frame, StompConnection connection) throws Exception
- {
- Map<String, Object> headers = frame.getHeaders();
- String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- throw new StompException("transaction header is mandatory to COMMIT a transaction");
- }
-
- StompSession session = getTransactedSession(connection, txID);
- if (session == null)
- {
- throw new StompException("No transaction started: " + txID);
- }
- transactedSessions.remove(txID);
- session.getSession().commit();
-
- return null;
- }
-
- private StompFrame onAbort(StompFrame frame, StompConnection connection) throws Exception
- {
- Map<String, Object> headers = frame.getHeaders();
- String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- throw new StompException("transaction header is mandatory to ABORT a transaction");
- }
-
- StompSession session = getTransactedSession(connection, txID);
-
- if (session == null)
- {
- throw new StompException("No transaction started: " + txID);
- }
- transactedSessions.remove(txID);
- session.getSession().rollback(false);
-
- return null;
- }
-
- private void checkConnected(StompConnection connection) throws StompException
- {
- if (!connection.isValid())
- {
- throw new StompException("Not connected");
- }
- }
-
- private StompSession getSession(StompConnection connection) throws Exception
- {
StompSession stompSession = sessions.get(connection.getID());
if (stompSession == null)
{
@@ -497,7 +199,7 @@
return stompSession;
}
- private StompSession getTransactedSession(StompConnection connection, String txID) throws Exception
+ public StompSession getTransactedSession(StompConnection connection, String txID) throws Exception
{
StompSession stompSession = transactedSessions.get(txID);
if (stompSession == null)
@@ -522,89 +224,6 @@
return stompSession;
}
- private StompFrame onDisconnect(StompFrame frame, StompConnection connection) throws Exception
- {
- cleanup(connection);
- return null;
- }
-
- private StompFrame onSend(StompFrame frame, StompConnection connection) throws Exception
- {
- checkConnected(connection);
- Map<String, Object> headers = frame.getHeaders();
- String destination = (String)headers.remove(Stomp.Headers.Send.DESTINATION);
- String txID = (String)headers.remove(Stomp.Headers.TRANSACTION);
- long timestamp = System.currentTimeMillis();
-
- ServerMessageImpl message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
- message.setTimestamp(timestamp);
- message.setAddress(SimpleString.toSimpleString(destination));
- StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
- if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
- {
- message.setType(Message.BYTES_TYPE);
- message.getBodyBuffer().writeBytes(frame.getContent());
- }
- else
- {
- message.setType(Message.TEXT_TYPE);
- String text = new String(frame.getContent(), "UTF-8");
- message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
- }
-
- StompSession stompSession = null;
- if (txID == null)
- {
- stompSession = getSession(connection);
- }
- else
- {
- stompSession = getTransactedSession(connection, txID);
- }
- if (stompSession.isNoLocal())
- {
- message.putStringProperty(CONNECTION_ID_PROP, connection.getID().toString());
- }
- stompSession.getSession().send(message, true);
-
- return null;
- }
-
- private StompFrame onConnect(StompFrame frame, final StompConnection connection) throws Exception
- {
- Map<String, Object> headers = frame.getHeaders();
- String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
- String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
- String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
- String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
- //since 1.1
- String acceptVersion = (String)headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
- String host = (String)headers.get(Stomp.Headers.Connect.HOST);
-
- HornetQSecurityManager sm = server.getSecurityManager();
-
- // The sm will be null case security is not enabled...
- if (sm != null)
- {
- sm.validateUser(login, passcode);
- }
-
- connection.negotiateVersion(acceptVersion);
- connection.setHost(host);
- connection.setLogin(login);
- connection.setPasscode(passcode);
- connection.setClientID(clientID);
- connection.setValid(true);
-
- HashMap<String, Object> h = new HashMap<String, Object>();
- h.put(Stomp.Headers.Connected.SESSION, connection.getID());
- if (requestID != null)
- {
- h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
- }
- return new StompFrame(Stomp.Responses.CONNECTED, h);
- }
-
public void cleanup(final StompConnection connection)
{
connection.setValid(false);
@@ -652,15 +271,18 @@
});
}
- private void sendReply(final StompConnection connection, final StompFrame frame)
+ public void sendReply(final StompConnection connection, final StompFrame frame)
{
server.getStorageManager().afterCompleteOperations(new IOAsyncTask()
{
public void onError(final int errorCode, final String errorMessage)
{
log.warn("Error processing IOCallback code = " + errorCode + " message = " + errorMessage);
+
+ HornetQStompException e = new HornetQStompException("Error sending reply",
+ new HornetQException(errorCode, errorMessage));
- StompFrame error = createError(new HornetQException(errorCode, errorMessage), frame);
+ StompFrame error = e.getFrame();
send(connection, error);
}
@@ -681,5 +303,95 @@
return "hornetq";
}
+ public boolean validateUser(String login, String passcode)
+ {
+ boolean validated = true;
+
+ HornetQSecurityManager sm = server.getSecurityManager();
+
+ // The sm will be null case security is not enabled...
+ if (sm != null)
+ {
+ validated = sm.validateUser(login, passcode);
+ }
+
+ return validated;
+ }
+
+ public ServerMessageImpl createServerMessage()
+ {
+ return new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+ }
+
+ public void commitTransaction(StompConnection connection, String txID) throws Exception
+ {
+ StompSession session = getTransactedSession(connection, txID);
+ if (session == null)
+ {
+ throw new HornetQStompException("No transaction started: " + txID);
+ }
+ transactedSessions.remove(txID);
+ session.getSession().commit();
+ }
+
+ public void abortTransaction(StompConnection connection, String txID) throws Exception
+ {
+ StompSession session = getTransactedSession(connection, txID);
+ if (session == null)
+ {
+ throw new HornetQStompException("No transaction started: " + txID);
+ }
+ transactedSessions.remove(txID);
+ session.getSession().rollback(false);
+ }
// Inner classes -------------------------------------------------
+
+ public void createSubscription(StompConnection connection,
+ String subscriptionID, String durableSubscriptionName,
+ String destination, String selector, String ack, boolean noLocal) throws Exception
+ {
+ StompSession stompSession = getSession(connection);
+ stompSession.setNoLocal(noLocal);
+ if (stompSession.containsSubscription(subscriptionID))
+ {
+ throw new HornetQStompException("There already is a subscription for: " + subscriptionID +
+ ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
+ }
+ long consumerID = server.getStorageManager().generateUniqueID();
+ String clientID = (connection.getClientID() != null) ? connection.getClientID() : null;
+ stompSession.addSubscription(consumerID,
+ subscriptionID,
+ clientID,
+ durableSubscriptionName,
+ destination,
+ selector,
+ ack);
+ }
+
+ public void unsubscribe(StompConnection connection,
+ String subscriptionID) throws Exception
+ {
+ StompSession stompSession = getSession(connection);
+ boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
+ if (!unsubscribed)
+ {
+ throw new HornetQStompException("Cannot unsubscribe as no subscription exists for id: " + subscriptionID);
+ }
+ }
+
+ public void acknowledge(StompConnection connection, String messageID, String subscriptionID) throws Exception
+ {
+ StompSession stompSession = getSession(connection);
+ stompSession.acknowledge(messageID, subscriptionID);
+ }
+
+ public void beginTransaction(StompConnection connection, String txID) throws Exception
+ {
+ if (transactedSessions.containsKey(txID))
+ {
+ throw new HornetQStompException("Transaction already started: " + txID);
+ }
+ // create the transacted session
+ getTransactedSession(connection, txID);
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-05 00:34:29 UTC (rev 11293)
@@ -12,7 +12,6 @@
*/
package org.hornetq.core.protocol.stomp;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@@ -39,7 +38,7 @@
*
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
-class StompSession implements SessionCallback
+public class StompSession implements SessionCallback
{
private static final Logger log = Logger.getLogger(StompSession.class);
@@ -84,41 +83,9 @@
try
{
StompSubscription subscription = subscriptions.get(consumerID);
-
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(Stomp.Headers.Message.DESTINATION, serverMessage.getAddress().toString());
- if (subscription.getID() != null)
- {
- headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
- }
- HornetQBuffer buffer = serverMessage.getBodyBuffer();
-
- int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
- : serverMessage.getEndOfBodyPosition();
- int size = bodyPos - buffer.readerIndex();
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
- byte[] data = new byte[size];
- if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
- {
- headers.put(Headers.CONTENT_LENGTH, data.length);
- buffer.readBytes(data);
- }
- else
- {
- SimpleString text = buffer.readNullableSimpleString();
- if (text != null)
- {
- data = text.toString().getBytes("UTF-8");
- }
- else
- {
- data = new byte[0];
- }
- }
- serverMessage.getBodyBuffer().resetReaderIndex();
- StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
- StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
-
+
+ StompFrame frame = connection.createStompMessage(serverMessage, subscription, deliveryCount);
+
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -168,10 +135,19 @@
connection.getTransportConnection().removeReadyListener(listener);
}
- public void acknowledge(String messageID) throws Exception
+ public void acknowledge(String messageID, String subscriptionID) throws Exception
{
long id = Long.parseLong(messageID);
long consumerID = messagesToAck.remove(id);
+ StompSubscription sub = subscriptions.get(consumerID);
+
+ if (subscriptionID != null)
+ {
+ if (!sub.getID().equals(subscriptionID))
+ {
+ throw new HornetQStompException("subscription id " + subscriptionID + " does not match " + sub.getID());
+ }
+ }
session.acknowledge(consumerID, id);
session.commit();
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java 2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java 2011-09-05 00:34:29 UTC (rev 11293)
@@ -32,7 +32,7 @@
*
*
*/
-class StompUtils
+public class StompUtils
{
// Constants -----------------------------------------------------
private static final String DEFAULT_MESSAGE_PRIORITY= "4";
@@ -46,8 +46,6 @@
public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception
{
- Map<String, Object> headers = new HashMap<String, Object>(frame.getHeaders());
-
String priority = (String)headers.remove(Stomp.Headers.Send.PRIORITY);
if (priority != null)
{
@@ -93,27 +91,26 @@
public static void copyStandardHeadersFromMessageToFrame(MessageInternal message, StompFrame command, int deliveryCount) throws Exception
{
- final Map<String, Object> headers = command.getHeaders();
- headers.put(Stomp.Headers.Message.DESTINATION, message.getAddress().toString());
- headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getMessageID());
+ command.addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(message.getMessageID()));
+ command.addHeader(Stomp.Headers.Message.DESTINATION, message.getAddress().toString());
if (message.getObjectProperty("JMSCorrelationID") != null)
{
- headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty("JMSCorrelationID"));
+ command.addHeader(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty("JMSCorrelationID").toString());
}
- headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getExpiration());
- headers.put(Stomp.Headers.Message.REDELIVERED, deliveryCount > 1);
- headers.put(Stomp.Headers.Message.PRORITY, "" + message.getPriority());
+ command.addHeader(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getExpiration());
+ command.addHeader(Stomp.Headers.Message.REDELIVERED, String.valueOf(deliveryCount > 1));
+ command.addHeader(Stomp.Headers.Message.PRORITY, "" + message.getPriority());
if (message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME) != null)
{
- headers.put(Stomp.Headers.Message.REPLY_TO,
+ command.addHeader(Stomp.Headers.Message.REPLY_TO,
message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME));
}
- headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getTimestamp());
+ command.addHeader(Stomp.Headers.Message.TIMESTAMP, "" + message.getTimestamp());
if (message.getObjectProperty("JMSType") != null)
{
- headers.put(Stomp.Headers.Message.TYPE, message.getObjectProperty("JMSType"));
+ command.addHeader(Stomp.Headers.Message.TYPE, message.getObjectProperty("JMSType").toString());
}
// now lets add all the message headers
@@ -127,7 +124,7 @@
continue;
}
- headers.put(name.toString(), message.getObjectProperty(name));
+ command.addHeader(name.toString(), message.getObjectProperty(name).toString());
}
}
// Constructors --------------------------------------------------
Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-05 00:34:29 UTC (rev 11293)
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.UnsupportedEncodingException;
+
+import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
+import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
+import org.hornetq.core.server.ServerMessage;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public abstract class VersionedStompFrameHandler
+{
+ protected StompConnection connection;
+
+ public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version)
+ {
+ if (version == StompVersions.V1_0)
+ {
+ return new StompFrameHandlerV10(connection);
+ }
+ if (version == StompVersions.V1_1)
+ {
+ return new StompFrameHandlerV11(connection);
+ }
+ return null;
+ }
+
+ public StompFrame handleFrame(StompFrame request)
+ {
+ StompFrame response = null;
+
+ if (Stomp.Commands.SEND.equals(request.getCommand()))
+ {
+ response = onSend(request);
+ }
+ else if (Stomp.Commands.ACK.equals(request.getCommand()))
+ {
+ response = onAck(request);
+ }
+ else if (Stomp.Commands.NACK.equals(request.getCommand()))
+ {
+ response = onNack(request);
+ }
+ else if (Stomp.Commands.BEGIN.equals(request.getCommand()))
+ {
+ response = onBegin(request);
+ }
+ else if (Stomp.Commands.COMMIT.equals(request.getCommand()))
+ {
+ response = onCommit(request);
+ }
+ else if (Stomp.Commands.ABORT.equals(request.getCommand()))
+ {
+ response = onAbort(request);
+ }
+ else if (Stomp.Commands.SUBSCRIBE.equals(request.getCommand()))
+ {
+ response = onSubscribe(request);
+ }
+ else if (Stomp.Commands.UNSUBSCRIBE.equals(request.getCommand()))
+ {
+ response = onUnsubscribe(request);
+ }
+ else if (Stomp.Commands.CONNECT.equals(request.getCommand()))
+ {
+ response = onConnect(request);
+ }
+ else if (Stomp.Commands.STOMP.equals(request.getCommand()))
+ {
+ response = onStomp(request);
+ }
+ else if (Stomp.Commands.DISCONNECT.equals(request.getCommand()))
+ {
+ response = onDisconnect(request);
+ }
+ else
+ {
+ response = onUnknown(request.getCommand());
+ }
+
+ if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED) && (response == null))
+ {
+ response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+ }
+
+ return response;
+ }
+
+ public abstract StompFrame onConnect(StompFrame frame);
+ public abstract StompFrame onDisconnect(StompFrame frame);
+ public abstract StompFrame onSend(StompFrame frame);
+ public abstract StompFrame onAck(StompFrame request);
+ public abstract StompFrame onBegin(StompFrame frame);
+ public abstract StompFrame onCommit(StompFrame request);
+ public abstract StompFrame onAbort(StompFrame request);
+ public abstract StompFrame onSubscribe(StompFrame request);
+ public abstract StompFrame onUnsubscribe(StompFrame request);
+ public abstract StompFrame onStomp(StompFrame request);
+ public abstract StompFrame onNack(StompFrame request);
+
+ public StompFrame onUnknown(String command)
+ {
+ StompFrame response = new HornetQStompException("Unsupported command " + command).getFrame();
+ return response;
+ }
+
+ public StompFrame handleReceipt(String receiptID)
+ {
+ StompFrame receipt = new StompFrame(Stomp.Responses.RECEIPT);
+ receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
+
+ return receipt;
+ }
+
+ public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
+ StompSubscription subscription, int deliveryCount) throws Exception;
+}
Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-05 00:34:29 UTC (rev 11293)
@@ -0,0 +1,348 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp.v10;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.protocol.stomp.StompConnection;
+import org.hornetq.core.protocol.stomp.StompFrame;
+import org.hornetq.core.protocol.stomp.StompSubscription;
+import org.hornetq.core.protocol.stomp.StompUtils;
+import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
+import org.hornetq.core.protocol.stomp.Stomp.Headers;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.utils.DataConstants;
+
+/**
+*
+* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+*/
+public class StompFrameHandlerV10 extends VersionedStompFrameHandler
+{
+ private static final Logger log = Logger.getLogger(StompFrameHandlerV10.class);
+
+ public StompFrameHandlerV10(StompConnection connection)
+ {
+ this.connection = connection;
+ }
+
+ @Override
+ public StompFrame onConnect(StompFrame frame)
+ {
+ StompFrame response = null;
+ Map<String, String> headers = frame.getHeadersMap();
+ String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+ String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+ String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
+ String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+ if (connection.validateUser(login, passcode))
+ {
+ connection.setClientID(clientID);
+ connection.setValid(true);
+
+ response = new StompFrame(Stomp.Responses.CONNECTED);
+
+ response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
+
+ if (requestID != null)
+ {
+ response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ }
+ }
+ else
+ {
+ //not valid
+ response = new StompFrame(Stomp.Responses.ERROR);
+ response.addHeader(Stomp.Headers.Error.MESSAGE, "Failed to connect");
+ response.setBody("The login account is not valid.");
+
+ connection.sendFrame(response);
+ connection.destroy();
+
+ return null;
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onDisconnect(StompFrame frame)
+ {
+ connection.destroy();
+ return null;
+ }
+
+ @Override
+ public StompFrame onSend(StompFrame frame)
+ {
+ StompFrame response = null;
+ try
+ {
+ connection.validate();
+ String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+
+ long timestamp = System.currentTimeMillis();
+
+ ServerMessageImpl message = connection.createServerMessage();
+ message.setTimestamp(timestamp);
+ message.setAddress(SimpleString.toSimpleString(destination));
+ StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
+ if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
+ {
+ message.setType(Message.BYTES_TYPE);
+ message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
+ }
+ else
+ {
+ message.setType(Message.TEXT_TYPE);
+ String text = frame.getBody();
+ message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
+ }
+
+ connection.sendServerMessage(message, txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ catch (Exception e)
+ {
+ response = new HornetQStompException("Error handling send", e).getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onBegin(StompFrame frame)
+ {
+ StompFrame response = null;
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("Need a transaction id to begin").getFrame();
+ }
+ else
+ {
+ try
+ {
+ connection.beginTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onCommit(StompFrame request)
+ {
+ StompFrame response = null;
+
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to COMMIT a transaction").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.commitTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onAbort(StompFrame request)
+ {
+ StompFrame response = null;
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to ABORT a transaction").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.abortTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onSubscribe(StompFrame request)
+ {
+ StompFrame response = null;
+ String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
+
+ String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
+ String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
+ String id = request.getHeader(Stomp.Headers.Subscribe.ID);
+ String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
+ boolean noLocal = false;
+
+ if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
+ {
+ noLocal = Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
+ }
+
+ try
+ {
+ connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onUnsubscribe(StompFrame request)
+ {
+ StompFrame response = null;
+ String destination = request.getHeader(Stomp.Headers.Unsubscribe.DESTINATION);
+ String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
+
+ String subscriptionID = null;
+ if (id != null)
+ {
+ subscriptionID = id;
+ }
+ else
+ {
+ if (destination == null)
+ {
+ response = new HornetQStompException("Must specify the subscription's id or " +
+ "the destination you are unsubscribing from").getFrame();
+ return response;
+ }
+ subscriptionID = "subscription/" + destination;
+ }
+
+ try
+ {
+ connection.unsubscribe(subscriptionID);
+ }
+ catch (HornetQStompException e)
+ {
+ return e.getFrame();
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onAck(StompFrame request)
+ {
+ StompFrame response = null;
+
+ String messageID = request.getHeader(Stomp.Headers.Ack.MESSAGE_ID);
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+
+ if (txID != null)
+ {
+ log.warn("Transactional acknowledgement is not supported");
+ }
+
+ try
+ {
+ connection.acknowledge(messageID, null);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onStomp(StompFrame request)
+ {
+ return onUnknown(request.getCommand());
+ }
+
+ @Override
+ public StompFrame onNack(StompFrame request)
+ {
+ return onUnknown(request.getCommand());
+ }
+
+ @Override
+ public StompFrame createMessageFrame(ServerMessage serverMessage,
+ StompSubscription subscription, int deliveryCount) throws Exception
+ {
+ StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE);
+
+ if (subscription.getID() != null)
+ {
+ frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
+ }
+ HornetQBuffer buffer = serverMessage.getBodyBuffer();
+
+ int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
+ : serverMessage.getEndOfBodyPosition();
+ int size = bodyPos - buffer.readerIndex();
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+ byte[] data = new byte[size];
+ if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
+ {
+ frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
+ buffer.readBytes(data);
+ }
+ else
+ {
+ SimpleString text = buffer.readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes("UTF-8");
+ }
+ else
+ {
+ data = new byte[0];
+ }
+ }
+ serverMessage.getBodyBuffer().resetReaderIndex();
+
+ StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
+
+ return frame;
+
+ }
+
+}
Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-05 00:34:29 UTC (rev 11293)
@@ -0,0 +1,362 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp.v11;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.protocol.stomp.StompConnection;
+import org.hornetq.core.protocol.stomp.StompFrame;
+import org.hornetq.core.protocol.stomp.StompSubscription;
+import org.hornetq.core.protocol.stomp.StompUtils;
+import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
+import org.hornetq.core.protocol.stomp.Stomp.Headers;
+import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.utils.DataConstants;
+
+public class StompFrameHandlerV11 extends VersionedStompFrameHandler
+{
+ private static final Logger log = Logger.getLogger(StompFrameHandlerV11.class);
+
+ public StompFrameHandlerV11(StompConnection connection)
+ {
+ this.connection = connection;
+ }
+
+ @Override
+ public StompFrame onConnect(StompFrame frame)
+ {
+ StompFrame response = null;
+ Map<String, String> headers = frame.getHeadersMap();
+ String login = headers.get(Stomp.Headers.Connect.LOGIN);
+ String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
+ String clientID = headers.get(Stomp.Headers.Connect.CLIENT_ID);
+ String requestID = headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+ if (connection.validateUser(login, passcode))
+ {
+ connection.setClientID(clientID);
+ connection.setValid(true);
+
+ response = new StompFrame(Stomp.Responses.CONNECTED);
+
+ //version
+ response.addHeader(Stomp.Headers.Connected.VERSION, connection.getVersion());
+
+ //session
+ response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
+
+ //server
+ response.addHeader(Stomp.Headers.Connected.SERVER, connection.getHornetQServerName());
+
+ if (requestID != null)
+ {
+ response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ }
+ }
+ else
+ {
+ //not valid
+ response = new StompFrame(Stomp.Responses.ERROR);
+ response.addHeader(Stomp.Headers.Error.VERSION, "1.0,1.1");
+
+ response.setBody("Supported protocol versions are 1.0 and 1.1");
+
+ connection.sendFrame(response);
+ connection.destroy();
+
+ return null;
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onDisconnect(StompFrame frame)
+ {
+ connection.destroy();
+ return null;
+ }
+
+ @Override
+ public StompFrame onSend(StompFrame frame)
+ {
+ StompFrame response = null;
+ try
+ {
+ connection.validate();
+ String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+
+ long timestamp = System.currentTimeMillis();
+
+ ServerMessageImpl message = connection.createServerMessage();
+ message.setTimestamp(timestamp);
+ message.setAddress(SimpleString.toSimpleString(destination));
+ StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
+ if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
+ {
+ message.setType(Message.BYTES_TYPE);
+ message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
+ }
+ else
+ {
+ message.setType(Message.TEXT_TYPE);
+ String text = frame.getBody();
+ message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
+ }
+
+ connection.sendServerMessage(message, txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ catch (Exception e)
+ {
+ response = new HornetQStompException("Error handling send", e).getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onBegin(StompFrame frame)
+ {
+ StompFrame response = null;
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("Need a transaction id to begin").getFrame();
+ }
+ else
+ {
+ try
+ {
+ connection.beginTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onCommit(StompFrame request)
+ {
+ StompFrame response = null;
+
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to COMMIT a transaction").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.commitTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onAbort(StompFrame request)
+ {
+ StompFrame response = null;
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to ABORT a transaction").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.abortTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onSubscribe(StompFrame request)
+ {
+ StompFrame response = null;
+ String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
+
+ String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
+ String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
+ String id = request.getHeader(Stomp.Headers.Subscribe.ID);
+ String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
+ boolean noLocal = false;
+
+ if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
+ {
+ noLocal = Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
+ }
+
+ try
+ {
+ connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onUnsubscribe(StompFrame request)
+ {
+ StompFrame response = null;
+ //unsubscribe in 1.1 only needs id header
+ String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
+
+ String subscriptionID = null;
+ if (id != null)
+ {
+ subscriptionID = id;
+ }
+ else
+ {
+ response = new HornetQStompException("Must specify the subscription's id").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.unsubscribe(subscriptionID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onAck(StompFrame request)
+ {
+ StompFrame response = null;
+
+ String messageID = request.getHeader(Stomp.Headers.Ack.MESSAGE_ID);
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+ String subscriptionID = request.getHeader(Stomp.Headers.Ack.SUBSCRIPTION);
+
+ if (txID != null)
+ {
+ log.warn("Transactional acknowledgement is not supported");
+ }
+
+ if (subscriptionID == null)
+ {
+ response = new HornetQStompException("subscription header is required").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.acknowledge(messageID, subscriptionID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onStomp(StompFrame request)
+ {
+ return onConnect(request);
+ }
+
+ @Override
+ public StompFrame onNack(StompFrame request)
+ {
+ //this eventually means discard the message (it never be redelivered again).
+ //we can consider supporting redeliver to a different sub.
+ return onAck(request);
+ }
+
+ @Override
+ public StompFrame createMessageFrame(ServerMessage serverMessage,
+ StompSubscription subscription, int deliveryCount)
+ throws Exception
+ {
+ StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE);
+
+ if (subscription.getID() != null)
+ {
+ frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
+ }
+
+ HornetQBuffer buffer = serverMessage.getBodyBuffer();
+
+ int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
+ : serverMessage.getEndOfBodyPosition();
+ int size = bodyPos - buffer.readerIndex();
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+ byte[] data = new byte[size];
+ if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
+ {
+ frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
+ buffer.readBytes(data);
+ }
+ else
+ {
+ SimpleString text = buffer.readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes("UTF-8");
+ }
+ else
+ {
+ data = new byte[0];
+ }
+ }
+ frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
+
+ serverMessage.getBodyBuffer().resetReaderIndex();
+
+ StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
+
+ return frame;
+
+ }
+
+}
13 years, 4 months
JBoss hornetq SVN: r11292 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-02 07:52:55 -0400 (Fri, 02 Sep 2011)
New Revision: 11292
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
reduce visibility
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-02 11:52:16 UTC (rev 11291)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-02 11:52:55 UTC (rev 11292)
@@ -604,7 +604,8 @@
return new LargeServerMessageImpl(this);
}
- public void addBytesToLargeMessage(final SequentialFile file, final long messageId, final byte[] bytes) throws Exception
+ protected void addBytesToLargeMessage(final SequentialFile file, final long messageId, final byte[] bytes)
+ throws Exception
{
readLock();
try
13 years, 4 months
JBoss hornetq SVN: r11291 - in branches/HORNETQ-720_Replication: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-02 07:52:16 -0400 (Fri, 02 Sep 2011)
New Revision: 11291
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 Add tests for large-message delete, and improve other tests
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-02 00:30:14 UTC (rev 11290)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-02 11:52:16 UTC (rev 11291)
@@ -443,7 +443,10 @@
}
/**
- * Assumes the
+ * Collects a list of existing large messages and their current size.
+ * <p>
+ * So we know how much of a given message to sync with the backup. Further data appends to the
+ * messages will be replicated normally.
* @return
* @throws Exception
*/
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-02 00:30:14 UTC (rev 11290)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-02 11:52:16 UTC (rev 11291)
@@ -6,6 +6,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
@@ -20,12 +21,13 @@
public class BackupSyncJournalTest extends FailoverTestBase
{
+ private static final int BACKUP_WAIT_TIME = 20;
private ServerLocatorInternal locator;
private ClientSessionFactoryInternal sessionFactory;
private ClientSession session;
private ClientProducer producer;
private BackupSyncDelay syncDelay;
- private static final int N_MSGS = 10;
+ protected static final int N_MSGS = 10;
@Override
protected void setUp() throws Exception
@@ -51,9 +53,10 @@
public void testReserveFileIdValuesOnBackup() throws Exception
{
+ final int totalRounds = 5;
createProducerSendSomeMessages();
JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
- for (int i = 0; i < 5; i++)
+ for (int i = 0; i < totalRounds; i++)
{
messageJournal.forceMoveNextFile();
sendMessages(session, producer, N_MSGS);
@@ -61,7 +64,7 @@
backupServer.start();
- waitForBackup(sessionFactory, 10, false);
+ waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
// SEND more messages, now with the backup replicating
sendMessages(session, producer, N_MSGS);
@@ -72,24 +75,53 @@
JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
Set<Long> backupIds = getFileIds(backupMsgJournal);
assertEquals("File IDs must match!", liveIds, backupIds);
+
+ // "+ 2": there two other calls that send N_MSGS.
+ for (int i = 0; i < totalRounds + 2; i++)
+ {
+ receiveMsgsInRange(0, N_MSGS);
+ }
+ assertNoMoreMessages();
}
+ private void assertNoMoreMessages() throws HornetQException
+ {
+ session.start();
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ ClientMessage msg = consumer.receive(200);
+ assertNull("there should be no more messages to receive! " + msg, msg);
+ consumer.close();
+ session.commit();
+
+ }
+
+ protected void startBackupFinishSyncing() throws Exception
+ {
+ backupServer.start();
+ syncDelay.deliverUpToDateMsg();
+ waitForBackup(sessionFactory, BACKUP_WAIT_TIME, true);
+ }
+
public void testReplicationDuringSync() throws Exception
{
createProducerSendSomeMessages();
backupServer.start();
- waitForBackup(sessionFactory, 10, false);
+ waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
sendMessages(session, producer, N_MSGS);
session.commit();
- receiveMsgs(0, N_MSGS);
+ receiveMsgsInRange(0, N_MSGS);
+
finishSyncAndFailover();
+
+ receiveMsgsInRange(0, N_MSGS);
+ assertNoMoreMessages();
}
private void finishSyncAndFailover() throws Exception
{
syncDelay.deliverUpToDateMsg();
- waitForBackup(sessionFactory, 10, true);
+ waitForBackup(sessionFactory, BACKUP_WAIT_TIME, true);
assertFalse("should not be initialized", backupServer.getServer().isInitialised());
crash(session);
waitForServerInitialization(backupServer, 5);
@@ -99,15 +131,17 @@
{
createProducerSendSomeMessages();
startBackupCrashLive();
- receiveMsgs(0, N_MSGS);
+ receiveMsgsInRange(0, N_MSGS);
+ assertNoMoreMessages();
}
public void testMessageSync() throws Exception
{
createProducerSendSomeMessages();
- receiveMsgs(0, N_MSGS / 2);
+ receiveMsgsInRange(0, N_MSGS / 2);
startBackupCrashLive();
- receiveMsgs(N_MSGS / 2, N_MSGS);
+ receiveMsgsInRange(N_MSGS / 2, N_MSGS);
+ assertNoMoreMessages();
}
private void startBackupCrashLive() throws Exception
@@ -115,12 +149,12 @@
assertFalse("backup is started?", backupServer.isStarted());
liveServer.removeInterceptor(syncDelay);
backupServer.start();
- waitForBackup(sessionFactory, 20);
+ waitForBackup(sessionFactory, BACKUP_WAIT_TIME);
crash(session);
waitForServerInitialization(backupServer, 5);
}
- private void createProducerSendSomeMessages() throws HornetQException, Exception
+ protected void createProducerSendSomeMessages() throws HornetQException, Exception
{
session = sessionFactory.createSession(true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -129,7 +163,7 @@
session.commit();
}
- private void receiveMsgs(int start, int end) throws HornetQException
+ protected void receiveMsgsInRange(int start, int end) throws HornetQException
{
session.start();
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-09-02 00:30:14 UTC (rev 11290)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-09-02 11:52:16 UTC (rev 11291)
@@ -1,5 +1,9 @@
package org.hornetq.tests.integration.cluster.failover;
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -7,10 +11,6 @@
public class BackupSyncLargeMessageTest extends BackupSyncJournalTest
{
- /**
- * @param i
- * @param message
- */
@Override
protected void assertMessageBody(final int i, final ClientMessage message)
{
@@ -25,13 +25,36 @@
return (ServerLocatorInternal)locator;
}
- /**
- * @param i
- * @param message
- */
@Override
protected void setBody(final int i, final ClientMessage message) throws Exception
{
setLargeMessageBody(i, message);
}
+
+ // ------------------------
+
+ public void testDeleteLargeMessages() throws Exception
+ {
+ createProducerSendSomeMessages();
+ startBackupFinishSyncing();
+ File dir = new File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
+ System.out.println("Dir " + dir.getAbsolutePath() + " " + dir.exists());
+ // Set<Long> idsOnBkp = getAllMessageFileIds(dir);
+ receiveMsgsInRange(0, N_MSGS / 2);
+ assertEquals("we really ought to delete these after delivery", N_MSGS / 2, getAllMessageFileIds(dir).size());
+ }
+
+ private Set<Long> getAllMessageFileIds(File dir)
+ {
+ Set<Long> idsOnBkp = new HashSet<Long>();
+ for (String filename : dir.list())
+ {
+ if (filename.endsWith(".msg"))
+ {
+ idsOnBkp.add(Long.valueOf(filename.split("\\.")[0]));
+ }
+ }
+ return idsOnBkp;
+ }
+
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-02 00:30:14 UTC (rev 11290)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-02 11:52:16 UTC (rev 11291)
@@ -43,6 +43,7 @@
{
if (backup.isStarted())
handler.deliver();
+ live.removeInterceptor(this);
}
public BackupSyncDelay(TestableServer backup, TestableServer live)
13 years, 4 months