JBoss hornetq SVN: r9753 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-05 13:33:33 -0400 (Tue, 05 Oct 2010)
New Revision: 9753
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
Reload Cursors
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java 2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java 2010-10-05 17:33:33 UTC (rev 9753)
@@ -81,4 +81,6 @@
SimpleString[] getStoreNames();
void deletePageStore(SimpleString storeName) throws Exception;
+
+ void processReload();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-05 17:33:33 UTC (rev 9753)
@@ -63,6 +63,8 @@
Page createPage(final int page) throws Exception;
PageCursorProvider getCursorProvier();
+
+ void processReload();
/**
* @return false if a thread was already started, or if not in page mode
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-05 17:33:33 UTC (rev 9753)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
/**
* A PageCursor
@@ -26,28 +27,32 @@
public interface PageCursor
{
+ // Cursor query operations --------------------------------------
+
Pair<PagePosition, ServerMessage> moveNext() throws Exception;
- PagePosition getFirstPosition();
-
void ack(PagePosition position) throws Exception;
- void ackTx(long tx, PagePosition position) throws Exception;
+ void ackTx(Transaction tx, PagePosition position) throws Exception;
+ // Reload operations
+
/**
* @param position
*/
- void recoverACK(PagePosition position);
+ void reloadACK(PagePosition position);
/**
* To be used to avoid a redelivery of a prepared ACK after load
* @param position
*/
- void recoverPreparedACK(PagePosition position);
+ void reloadPreparedACK(Transaction tx, PagePosition position);
+
+ void processReload();
/**
* To be used on redeliveries
* @param position
*/
- void returnElement(PagePosition position);
+ void redeliver(PagePosition position);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-05 17:33:33 UTC (rev 9753)
@@ -58,6 +58,8 @@
Pair<PagePosition, ServerMessage> getAfter(PagePosition pos) throws Exception;
+ void processReload();
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-05 17:33:33 UTC (rev 9753)
@@ -58,7 +58,7 @@
*/
public Page getPage()
{
- return null;
+ return page;
}
/* (non-Javadoc)
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-05 17:33:33 UTC (rev 9753)
@@ -13,6 +13,10 @@
package org.hornetq.core.paging.cursor.impl;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
import org.hornetq.api.core.Pair;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCursor;
@@ -20,6 +24,7 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
/**
* A PageCursorImpl
@@ -44,6 +49,8 @@
private final PageCursorProvider cursorProvider;
private volatile PagePosition lastPosition;
+
+ private List<PagePosition> recoveredACK;
// Static --------------------------------------------------------
@@ -94,64 +101,93 @@
store.storeCursorAcknowledge(cursorId, position);
}
- public void ackTx(final long tx, final PagePosition position) throws Exception
+ public void ackTx(final Transaction tx, final PagePosition position) throws Exception
{
- store.storeCursorAcknowledgeTransactional(tx, cursorId, position);
+ store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
+ installTXCallback(tx, position);
+ // tx.afterCommit()
}
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void returnElement(final PagePosition position)
+ public void redeliver(final PagePosition position)
{
// TODO Auto-generated method stub
}
+
+ /**
+ * Theres no need to synchronize this method as it's only called from journal load on startup
+ */
+ public void reloadACK(final PagePosition position)
+ {
+ internalAdd(position);
+
+ }
+
/* (non-Javadoc)
- * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPosition()
+ * @see org.hornetq.core.paging.cursor.PageCursor#recoverPreparedACK(org.hornetq.core.paging.cursor.PagePosition)
*/
- public PagePosition getFirstPosition()
+ public void reloadPreparedACK(final Transaction tx, final PagePosition position)
{
- // TODO Auto-generated method stub
- return null;
+ internalAdd(position);
+ installTXCallback(tx, position);
}
+ public void processReload()
+ {
+ if (this.recoveredACK != null)
+ {
+ Collections.sort(recoveredACK);
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
protected boolean match(final ServerMessage message)
{
+ // To be used with expressions
return true;
}
+
+
// Private -------------------------------------------------------
+
+ /**
+ * @param committedACK
+ */
+ private void internalAdd(final PagePosition committedACK)
+ {
+ if (recoveredACK == null)
+ {
+ recoveredACK = new LinkedList<PagePosition>();
+ }
+
+ recoveredACK.add(committedACK);
+ }
+
private PagePosition recoverLastPosition()
{
long firstPage = pageStore.getFirstPage();
return new PagePositionImpl(firstPage, -1);
}
+
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.cursor.PageCursor#recoverACK(org.hornetq.core.paging.cursor.PagePosition)
+ /**
+ * @param tx
+ * @param position
*/
- public void recoverACK(final PagePosition position)
+ private void installTXCallback(Transaction tx, PagePosition position)
{
- // TODO Auto-generated method stub
+ }
- }
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.cursor.PageCursor#recoverPreparedACK(org.hornetq.core.paging.cursor.PagePosition)
- */
- public void recoverPreparedACK(final PagePosition position)
- {
- // TODO Auto-generated method stub
-
- }
-
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-05 17:33:33 UTC (rev 9753)
@@ -94,6 +94,8 @@
*/
public Pair<PagePosition, ServerMessage> getAfter(final PagePosition pos) throws Exception
{
+ // TODO: consider page transactions here to avoid receiving an uncommitted message
+ // TODO: consider the case where a page came empty because of an ignored PageTX
PagePosition retPos = pos.nextMessage();
PageCache cache = getPageCache(pos.getPageNr());
@@ -179,6 +181,14 @@
return softCache.size();
}
+ public void processReload()
+ {
+ for (PageCursor cursor : this.activeCursors.values())
+ {
+ cursor.processReload();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-10-05 17:33:33 UTC (rev 9753)
@@ -228,7 +228,16 @@
}
}
}
+
+ public void processReload()
+ {
+ for (PagingStore store: stores.values())
+ {
+ store.processReload();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-05 17:33:33 UTC (rev 9753)
@@ -355,7 +355,14 @@
currentPageLock.readLock().unlock();
}
}
+
+ public void processReload()
+ {
+ cursorProvider.processReload();
+ }
+
+
// HornetQComponent implementation
public synchronized boolean isStarted()
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-05 17:33:33 UTC (rev 9753)
@@ -1019,11 +1019,11 @@
SimpleString address = queueInfo.getAddress();
PagingStore store = pagingManager.getPageStore(address);
PageCursor cursor = store.getCursorProvier().getCursor(encoding.queueID);
- cursor.recoverACK(encoding.position);
+ cursor.reloadACK(encoding.position);
}
else
{
- log.warn("Can't find queue " + queueInfo.getId() + " while reloading ACKNOWLEDGE_CURSOR");
+ log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
}
break;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-05 17:33:33 UTC (rev 9753)
@@ -1114,6 +1114,8 @@
deploymentManager.start();
}
+ pagingManager.reloadStores();
+
pagingManager.resumeDepages();
final ServerInfo dumper = new ServerInfo(this, pagingManager);
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-05 17:33:33 UTC (rev 9753)
@@ -1027,6 +1027,15 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#processReload()
+ */
+ public void processReload()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeStorageManager implements StorageManager
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-10-05 17:33:33 UTC (rev 9753)
@@ -323,6 +323,16 @@
return null;
}
+
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#processReload()
+ */
+ public void processReload()
+ {
+ }
+
}
}
13 years, 6 months
JBoss hornetq SVN: r9752 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-05 13:26:00 -0400 (Tue, 05 Oct 2010)
New Revision: 9752
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/LockFileImplTest.java
Log:
uncommented lock file test
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/LockFileImplTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/LockFileImplTest.java 2010-10-05 16:30:24 UTC (rev 9751)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/LockFileImplTest.java 2010-10-05 17:26:00 UTC (rev 9752)
@@ -103,7 +103,7 @@
// 1. Run the class as a Java application to execute the main() in a separate VM
// 2. Run this test
- public void _testInterrupt() throws Exception
+ public void testInterrupt() throws Exception
{
Activation t = new Activation();
t.start();
13 years, 6 months
JBoss hornetq SVN: r9751 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-05 12:30:24 -0400 (Tue, 05 Oct 2010)
New Revision: 9751
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
Log:
fixed reattach test
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2010-10-05 16:24:15 UTC (rev 9750)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2010-10-05 16:30:24 UTC (rev 9751)
@@ -1241,14 +1241,12 @@
* @return
*/
@Override
- protected ClientSessionFactoryInternal createSessionFactory() throws Exception
+ protected ServerLocator createLocator() throws Exception
{
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
locator.setReconnectAttempts(-1);
locator.setConfirmationWindowSize(1024 * 1024);
- final ClientSessionFactory sf = locator.createSessionFactory();
-
- return (ClientSessionFactoryInternal) sf;
+ return locator;
}
@Override
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java 2010-10-05 16:24:15 UTC (rev 9750)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadReattachSupport.java 2010-10-05 16:30:24 UTC (rev 9751)
@@ -23,6 +23,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
@@ -66,7 +67,7 @@
protected abstract void stop() throws Exception;
- protected abstract ClientSessionFactoryInternal createSessionFactory() throws Exception;
+ protected abstract ServerLocator createLocator() throws Exception;
@Override
protected void setUp() throws Exception
@@ -100,8 +101,10 @@
start();
- final ClientSessionFactoryInternal sf = createSessionFactory();
+ final ServerLocator locator = createLocator();
+ final ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
+
final ClientSession session = sf.createSession(false, true, true);
Failer failer = startFailer(failDelay, session, failOnCreateConnection);
@@ -174,6 +177,8 @@
session.close();
+ locator.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2010-10-05 16:24:15 UTC (rev 9750)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2010-10-05 16:30:24 UTC (rev 9751)
@@ -47,13 +47,13 @@
}
@Override
- protected ClientSessionFactoryInternal createSessionFactory() throws Exception
+ protected ServerLocator createLocator() throws Exception
{
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory")) ;
locator.setReconnectAttempts(-1);
locator.setConfirmationWindowSize(1024 * 1024);
locator.setAckBatchSize(0);
- return (ClientSessionFactoryInternal) locator.createSessionFactory();
+ return locator;
}
}
13 years, 6 months
JBoss hornetq SVN: r9750 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-05 12:24:15 -0400 (Tue, 05 Oct 2010)
New Revision: 9750
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
Log:
fixed random reattach test
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2010-10-05 16:20:43 UTC (rev 9749)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2010-10-05 16:24:15 UTC (rev 9750)
@@ -249,6 +249,8 @@
session.close();
+ locator.close();
+
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
13 years, 6 months
JBoss hornetq SVN: r9749 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/ssl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-05 12:20:43 -0400 (Tue, 05 Oct 2010)
New Revision: 9749
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java
Log:
fixed ssl test
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java 2010-10-05 16:13:58 UTC (rev 9748)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java 2010-10-05 16:20:43 UTC (rev 9749)
@@ -95,10 +95,9 @@
tc.getParams().put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "invalid password");
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(tc);
- ClientSessionFactory sf = locator.createSessionFactory();
try
{
- sf.createSession(false, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
Assert.fail();
}
catch (HornetQException e)
13 years, 6 months
JBoss hornetq SVN: r9748 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/server/cluster/impl and 5 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-10-05 12:13:58 -0400 (Tue, 05 Oct 2010)
New Revision: 9748
Modified:
branches/2_2_0_HA_Improvements/build-hornetq.xml
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/FailoverWithSharedStoreTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java
Log:
test fixes
Modified: branches/2_2_0_HA_Improvements/build-hornetq.xml
===================================================================
--- branches/2_2_0_HA_Improvements/build-hornetq.xml 2010-10-05 09:51:50 UTC (rev 9747)
+++ branches/2_2_0_HA_Improvements/build-hornetq.xml 2010-10-05 16:13:58 UTC (rev 9748)
@@ -1376,6 +1376,17 @@
<fileset dir="${test.classes.dir}">
<!-- excluded because of https://jira.jboss.org/jira/browse/HORNETQ-65 -->
<exclude name="**/cluster/failover/*StaticClusterWithBackupFailoverTest.class" />
+ <!--exclude any replication tests for now-->
+ <exclude name="**/cluster/failover/*ClusterWithBackupFailoverTestBase.class"/>
+
+ <exclude name="**/cluster/failover/*DiscoveryClusterWithBackupFailoverTest.class"/>
+ <exclude name="**/cluster/failover/*GroupingFailoverReplicationTest.class"/>
+ <exclude name="**/cluster/failover/*Replicated*.class"/>
+ <exclude name="**/cluster/replication/**.class"/>
+ <exclude name="**/cluster/failover/*ReplicatedDistributionTest.class"/>
+ <exclude name="**/cluster/failover/*SharedStoreDistributionTest.class"/>
+ <exclude name="**/cluster/failover/*ReplicatedNettyAsynchronousFailoverTest.class"/>
+ <exclude name="**/cluster/failover/Remote*.class"/>
<include name="${tests.param}"/>
</fileset>
</batchtest>
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java 2010-10-05 09:51:50 UTC (rev 9747)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/FakeLockFile.java 2010-10-05 16:13:58 UTC (rev 9748)
@@ -15,9 +15,7 @@
import java.io.File;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.WeakHashMap;
+import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -79,7 +77,7 @@
e.printStackTrace();
throw new IllegalStateException(e);
}
-
+
if(!f.exists())
{
throw new IllegalStateException("unable to create " + directory + fileName);
@@ -110,10 +108,10 @@
}
}
- public boolean unlock() throws IOException
+ public synchronized boolean unlock() throws IOException
{
+ semaphore.drainPermits();
semaphore.release();
-
return true;
}
@@ -134,4 +132,21 @@
}
locks.clear();
}
+
+ public static void clearLocks(String dir)
+ {
+ List<String> toClear = new ArrayList<String>();
+ for (Map.Entry<String, Semaphore> e : locks.entrySet())
+ {
+ if(e.getKey().startsWith(dir))
+ {
+ e.getValue().drainPermits();
+ toClear.add(e.getKey());
+ }
+ }
+ for (String s : toClear)
+ {
+ locks.remove(s);
+ }
+ }
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-05 09:51:50 UTC (rev 9747)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-05 16:13:58 UTC (rev 9748)
@@ -568,7 +568,7 @@
log.info("Backup server waiting for live lock file creation");
while (!liveLockFile.exists())
{
- log.info("Waiting for server live lock file. Live server is not started");
+ log.debug("Waiting for server live lock file. Live server is not started");
Thread.sleep(2000);
}
@@ -577,10 +577,12 @@
liveLock = createLockFile("live.lock", configuration.getJournalDirectory());
- clusterManager.start();
+ clusterManager.start();
- log.info("Backup server is up - waiting for failover");
+ started = true;
+ log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeID + "] started");
+
liveLock.lock();
// We need to test if the file exists again, since the live might have shutdown
@@ -730,12 +732,12 @@
activation.run();
}
+ started = true;
+
+ HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "] started");
}
- started = true;
- HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeID + "] started");
-
if (configuration.isBackup())
{
if (configuration.isSharedStore())
@@ -795,6 +797,10 @@
}
// we stop the remoting service outside a lock
+ if(remotingService == null)
+ {
+ System.out.println("HornetQServerImpl.stop");
+ }
remotingService.stop();
synchronized (this)
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-10-05 09:51:50 UTC (rev 9747)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-10-05 16:13:58 UTC (rev 9748)
@@ -47,6 +47,7 @@
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.tests.util.ServiceTestBase;
@@ -1247,7 +1248,7 @@
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(getDefaultJournalType());
configuration.setSharedStore(sharedStorage);
- if (sharedStorage)
+ if (sharedStorage && backup)
{
// Shared storage will share the node between the backup and live node
int nodeDirectoryToUse = backupNode == -1 ? node : backupNode;
@@ -1565,8 +1566,26 @@
servers[node].start();
ClusterTestBase.log.info("started server " + node);
+
}
- }
+ for (int node : nodes)
+ {
+ //wait for each server to start, it may be a backup and started in a separate thread
+ long timetowait =System.currentTimeMillis() + 5000;
+ while(!servers[node].isStarted())
+ {
+ Thread.sleep(100);
+ if(servers[node].isStarted())
+ {
+ break;
+ }
+ else if(System.currentTimeMillis() > timetowait)
+ {
+ fail("server didnt start");
+ }
+ }
+ }
+ }
protected void stopClusterConnections(final int... nodes) throws Exception
{
@@ -1593,6 +1612,7 @@
ClusterTestBase.log.info("stopping server " + node);
servers[node].stop();
ClusterTestBase.log.info("server stopped");
+ FakeLockFile.clearLocks(servers[node].getConfiguration().getJournalDirectory());
}
catch (Exception e)
{
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-10-05 09:51:50 UTC (rev 9747)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-10-05 16:13:58 UTC (rev 9748)
@@ -23,6 +23,7 @@
package org.hornetq.tests.integration.cluster.distribution;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.tests.util.UnitTestCase;
/**
@@ -387,9 +388,9 @@
closeSessionFactory(0);
closeSessionFactory(3);
- stopServers(0, 3, 5, 8);
+ stopServers(5, 8, 0, 3);
- startServers(5, 8, 0, 3);
+ startServers(0, 3, 5, 8);
Thread.sleep(2000);
@@ -562,11 +563,11 @@
protected void setupServers() throws Exception
{
// The backups
- setupServer(5, isFileStorage(), isNetty(), true, true);
- setupServer(6, isFileStorage(), isNetty(), true, true);
- setupServer(7, isFileStorage(), isNetty(), true, true);
- setupServer(8, isFileStorage(), isNetty(), true, true);
- setupServer(9, isFileStorage(), isNetty(), true, true);
+ setupServer(5, isFileStorage(), isNetty(), true, 0, true);
+ setupServer(6, isFileStorage(), isNetty(), true, 1, true);
+ setupServer(7, isFileStorage(), isNetty(), true, 2, true);
+ setupServer(8, isFileStorage(), isNetty(), true, 3, true);
+ setupServer(9, isFileStorage(), isNetty(), true, 4, true);
// The lives
setupServer(0, isFileStorage(), isNetty(), 5, true);
@@ -587,7 +588,7 @@
getServer(8).getConfiguration().setBackup(true);
getServer(9).getConfiguration().setBackup(true);
- startServers(5, 6, 7, 8, 9, 0, 1, 2, 3, 4);
+ startServers( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
}
@Override
@@ -597,7 +598,7 @@
closeAllSessionFactories();
- stopServers(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ stopServers(5, 6, 7, 8, 9, 0, 1, 2, 3, 4);
}
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-10-05 09:51:50 UTC (rev 9747)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-10-05 16:13:58 UTC (rev 9748)
@@ -226,75 +226,8 @@
Assert.assertEquals(0, sf.numConnections());
}
- /** It doesn't fail, but it restart both servers, live and backup, and the data should be received after the restart,
- * and the servers should be able to connect without any problems. */
- public void testRestartServers() throws Exception
- {
- ServerLocator locator = getServerLocator();
+
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
-
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
- ClientSession session = sf.createSession(true, true);
-
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
-
- ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(true);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
- session.commit();
-
- session.close();
-
- liveServer.stop();
- FakeLockFile.clearLocks();
- liveServer.start();
-
- sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
-
- session = sf.createSession(true, true);
-
- ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
-
- session.start();
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
- }
-
// https://jira.jboss.org/jira/browse/HORNETQ-285
public void testFailoverOnInitialConnection() throws Exception
{
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-10-05 09:51:50 UTC (rev 9747)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-10-05 16:13:58 UTC (rev 9748)
@@ -13,6 +13,9 @@
package org.hornetq.tests.integration.cluster.failover;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -217,6 +220,26 @@
InVMConnector.failOnCreateConnection = false;
super.tearDown();
+ try
+ {
+ ServerSocket serverSocket = new ServerSocket(5445);
+ serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ System.exit(9);
+ }
+ try
+ {
+ ServerSocket serverSocket = new ServerSocket(5446);
+ serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ System.exit(9);
+ }
}
protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator, int topologyMembers)
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2010-10-05 09:51:50 UTC (rev 9747)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2010-10-05 16:13:58 UTC (rev 9748)
@@ -55,10 +55,10 @@
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
- startServers(2, 0, 1);
try
{
+ startServers(2, 0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -146,10 +146,10 @@
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
- startServers(2, 0, 1);
try
{
+ startServers(2, 0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteFailoverTest.java 2010-10-05 09:51:50 UTC (rev 9747)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteFailoverTest.java 2010-10-05 16:13:58 UTC (rev 9748)
@@ -70,6 +70,24 @@
}
@Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ //just to make sure
+ if (liveServer != null)
+ {
+ try
+ {
+ liveServer.destroy();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
protected TestableServer createLiveServer()
{
return new RemoteProcessHornetQServer(SharedLiveServerConfiguration.class.getName());
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java 2010-10-05 09:51:50 UTC (rev 9747)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/RemoteSingleLiveMultipleBackupsFailoverTest.java 2010-10-05 16:13:58 UTC (rev 9748)
@@ -29,6 +29,7 @@
import org.hornetq.core.server.JournalType;
import org.hornetq.tests.integration.cluster.util.RemoteProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.RemoteServerConfiguration;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
public class RemoteSingleLiveMultipleBackupsFailoverTest extends SingleLiveMultipleBackupsFailoverTest
{
@@ -117,7 +118,26 @@
backups.put(4, SharedBackupServerConfiguration4.class.getName());
backups.put(5, SharedBackupServerConfiguration5.class.getName());
}
-
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ //make sure
+ for (TestableServer testableServer : servers.values())
+ {
+ try
+ {
+ testableServer.destroy();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+ }
+
protected boolean isNetty()
{
return true;
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/FailoverWithSharedStoreTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/FailoverWithSharedStoreTest.java 2010-10-05 09:51:50 UTC (rev 9747)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/remote/FailoverWithSharedStoreTest.java 2010-10-05 16:13:58 UTC (rev 9748)
@@ -171,6 +171,10 @@
locator.close();
}
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
finally
{
if (liveServer != null)
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java 2010-10-05 09:51:50 UTC (rev 9747)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServerSupport.java 2010-10-05 16:13:58 UTC (rev 9748)
@@ -149,10 +149,30 @@
OutputStreamWriter osw = new OutputStreamWriter(serverProcess.getOutputStream());
osw.write("STOP\n");
osw.flush();
- int exitValue = serverProcess.waitFor();
- if (exitValue != 0)
+ int exitValue = -99;
+ long tryTime = System.currentTimeMillis() + 5000;
+ while(true)
{
- serverProcess.destroy();
+ try
+ {
+ exitValue = serverProcess.exitValue();
+ }
+ catch (Exception e)
+ {
+ Thread.sleep(100);
+ }
+ if(exitValue == -99 && System.currentTimeMillis() < tryTime)
+ {
+ continue;
+ }
+ else
+ {
+ if (exitValue != 0)
+ {
+ serverProcess.destroy();
+ }
+ break;
+ }
}
}
13 years, 6 months
JBoss hornetq SVN: r9747 - trunk/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-10-05 05:51:50 -0400 (Tue, 05 Oct 2010)
New Revision: 9747
Added:
trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
Log:
https://jira.jboss.org/browse/HORNETQ-544
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java 2010-10-05 09:51:50 UTC (rev 9747)
@@ -0,0 +1,558 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A StompDecoder
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class StompDecoder
+{
+ private static final Logger log = Logger.getLogger(StompDecoder.class);
+
+ private static final boolean TRIM_LEADING_HEADER_VALUE_WHITESPACE = true;
+
+ private static final String COMMAND_ABORT = "ABORT";
+
+ private static final int COMMAND_ABORT_LENGTH = COMMAND_ABORT.length();
+
+ private static final String COMMAND_ACK = "ACK";
+
+ private static final int COMMAND_ACK_LENGTH = COMMAND_ACK.length();
+
+ private static final String COMMAND_BEGIN = "BEGIN";
+
+ private static final int COMMAND_BEGIN_LENGTH = COMMAND_BEGIN.length();
+
+ private static final String COMMAND_COMMIT = "COMMIT";
+
+ private static final int COMMAND_COMMIT_LENGTH = COMMAND_COMMIT.length();
+
+ private static final String COMMAND_CONNECT = "CONNECT";
+
+ private static final int COMMAND_CONNECT_LENGTH = COMMAND_CONNECT.length();
+
+ private static final String COMMAND_DISCONNECT = "DISCONNECT";
+
+ private static final int COMMAND_DISCONNECT_LENGTH = COMMAND_DISCONNECT.length();
+
+ private static final String COMMAND_SEND = "SEND";
+
+ private static final int COMMAND_SEND_LENGTH = COMMAND_SEND.length();
+
+ private static final String COMMAND_SUBSCRIBE = "SUBSCRIBE";
+
+ private static final int COMMAND_SUBSCRIBE_LENGTH = COMMAND_SUBSCRIBE.length();
+
+ private static final String COMMAND_UNSUBSCRIBE = "UNSUBSCRIBE";
+
+ private static final int COMMAND_UNSUBSCRIBE_LENGTH = COMMAND_UNSUBSCRIBE.length();
+
+ private static final byte A = (byte)'A';
+
+ private static final byte B = (byte)'B';
+
+ private static final byte C = (byte)'C';
+
+ private static final byte D = (byte)'D';
+
+ private static final byte E = (byte)'E';
+
+ private static final byte M = (byte)'M';
+
+ private static final byte S = (byte)'S';
+
+ private static final byte U = (byte)'U';
+
+ private static final byte HEADER_SEPARATOR = (byte)':';
+
+ private static final byte NEW_LINE = (byte)'\n';
+
+ private static final byte SPACE = (byte)' ';
+
+ private static final byte TAB = (byte)'\t';
+
+ private static String CONTENT_LENGTH_HEADER_NAME = "content-length";
+
+ private byte[] workingBuffer = new byte[1024];
+
+ private int pos;
+
+ private int data;
+
+ private String command;
+
+ private Map<String, Object> headers;
+
+ private int headerBytesCopyStart;
+
+ private boolean readingHeaders;
+
+ private boolean headerValueWhitespace;
+
+ private boolean inHeaderName;
+
+ private String headerName;
+
+ private boolean whiteSpaceOnly;
+
+ private int contentLength;
+
+ private int bodyStart;
+
+ public StompDecoder()
+ {
+ init();
+ }
+
+ public boolean hasBytes()
+ {
+ return data > pos;
+ }
+
+ /*
+ * Stomp format is a command on the first line
+ * followed by a set of headers NAME:VALUE
+ * followed by an empty line
+ * followed by an optional message body
+ * terminated with a null character
+ */
+ public synchronized StompFrame decode(final HornetQBuffer buffer) throws Exception
+ {
+ int readable = buffer.readableBytes();
+
+ if (data + readable >= workingBuffer.length)
+ {
+ resizeWorking(data + readable);
+ }
+
+ buffer.readBytes(workingBuffer, data, readable);
+
+ data += readable;
+
+ if (command == null)
+ {
+ if (data < 4)
+ {
+ // Need at least four bytes to identify the command
+ // - up to 3 bytes for the command name + potentially another byte for a leading \n
+
+ return null;
+ }
+
+ int offset;
+
+ if (workingBuffer[0] == NEW_LINE)
+ {
+ // Yuck, some badly behaved STOMP clients add a \n *after* the terminating NUL char at the end of the
+ // STOMP
+ // frame this can manifest as an extra \n at the beginning when the next STOMP frame is read - we need to
+ // deal
+ // with this
+ offset = 1;
+ }
+ else
+ {
+ offset = 0;
+ }
+
+ byte b = workingBuffer[offset];
+
+ switch (b)
+ {
+ case A:
+ {
+ if (workingBuffer[offset + 1] == B)
+ {
+ if (!tryIncrement(offset + COMMAND_ABORT_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // ABORT
+ command = COMMAND_ABORT;
+ }
+ else
+ {
+ if (!tryIncrement(offset + COMMAND_ACK_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // ACK
+ command = COMMAND_ACK;
+ }
+ break;
+ }
+ case B:
+ {
+ if (!tryIncrement(offset + COMMAND_BEGIN_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // BEGIN
+ command = COMMAND_BEGIN;
+
+ break;
+ }
+ case C:
+ {
+ if (workingBuffer[offset + 2] == M)
+ {
+ if (!tryIncrement(offset + COMMAND_COMMIT_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // COMMIT
+ command = COMMAND_COMMIT;
+ }
+ else
+ {
+ if (!tryIncrement(offset + COMMAND_CONNECT_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // CONNECT
+ command = COMMAND_CONNECT;
+ }
+ break;
+ }
+ case D:
+ {
+ if (!tryIncrement(offset + COMMAND_DISCONNECT_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // DISCONNECT
+ command = COMMAND_DISCONNECT;
+
+ break;
+ }
+ case S:
+ {
+ if (workingBuffer[offset + 1] == E)
+ {
+ if (!tryIncrement(offset + COMMAND_SEND_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // SEND
+ command = COMMAND_SEND;
+ }
+ else
+ {
+ if (!tryIncrement(offset + COMMAND_SUBSCRIBE_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // SUBSCRIBE
+ command = COMMAND_SUBSCRIBE;
+ }
+ break;
+ }
+ case U:
+ {
+ if (!tryIncrement(offset + COMMAND_UNSUBSCRIBE_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // UNSUBSCRIBE
+ command = COMMAND_UNSUBSCRIBE;
+
+ break;
+ }
+ default:
+ {
+ throwInvalid();
+ }
+ }
+
+ // Sanity check
+
+ if (workingBuffer[pos - 1] != NEW_LINE)
+ {
+ throwInvalid();
+ }
+ }
+
+ if (readingHeaders)
+ {
+ if (headerBytesCopyStart == -1)
+ {
+ headerBytesCopyStart = pos;
+ }
+
+ // Now the headers
+
+ outer: while (true)
+ {
+ byte b = workingBuffer[pos++];
+
+ switch (b)
+ {
+ case HEADER_SEPARATOR:
+ {
+ if (inHeaderName)
+ {
+ byte[] data = new byte[pos - headerBytesCopyStart - 1];
+
+ System.arraycopy(workingBuffer, headerBytesCopyStart, data, 0, data.length);
+
+ headerName = new String(data);
+
+ inHeaderName = false;
+
+ headerBytesCopyStart = pos;
+
+ headerValueWhitespace = true;
+ }
+
+ whiteSpaceOnly = false;
+
+ break;
+ }
+ case NEW_LINE:
+ {
+ if (whiteSpaceOnly)
+ {
+ // Headers are terminated by a blank line
+ readingHeaders = false;
+
+ break outer;
+ }
+
+ byte[] data = new byte[pos - headerBytesCopyStart - 1];
+
+ System.arraycopy(workingBuffer, headerBytesCopyStart, data, 0, data.length);
+
+ String headerValue = new String(data);
+
+ headers.put(headerName, headerValue);
+
+ if (headerName.equals(CONTENT_LENGTH_HEADER_NAME))
+ {
+ contentLength = Integer.parseInt(headerValue.toString());
+ }
+
+ whiteSpaceOnly = true;
+
+ headerBytesCopyStart = pos;
+
+ inHeaderName = true;
+
+ headerValueWhitespace = false;
+
+ break;
+ }
+ case SPACE:
+ {
+ }
+ case TAB:
+ {
+ if (TRIM_LEADING_HEADER_VALUE_WHITESPACE && headerValueWhitespace)
+ {
+ // trim off leading whitespace from header values.
+ // The STOMP spec examples seem to imply that whitespace should be trimmed although it is not
+ // explicit in the spec
+ // ActiveMQ + StompConnect also seem to trim whitespace from header values.
+ // Trimming is problematic though if the user has set a header with a value which deliberately
+ // has
+ // leading whitespace since
+ // this will be removed
+ headerBytesCopyStart++;
+ }
+
+ break;
+ }
+ default:
+ {
+ whiteSpaceOnly = false;
+
+ headerValueWhitespace = false;
+ }
+ }
+ if (pos == data)
+ {
+ // Run out of data
+
+ return null;
+ }
+ }
+ }
+
+ // Now the body
+
+ byte[] content = null;
+
+ if (contentLength != -1)
+ {
+ if (pos + contentLength > data)
+ {
+ // Need more bytes
+ }
+ else
+ {
+ content = new byte[contentLength];
+
+ System.arraycopy(workingBuffer, pos, content, 0, contentLength);
+
+ pos += contentLength + 1;
+ }
+ }
+ else
+ {
+ // Need to scan for terminating NUL
+
+ if (bodyStart == -1)
+ {
+ bodyStart = pos;
+ }
+
+ while (pos < data)
+ {
+ if (workingBuffer[pos++] == 0)
+ {
+ content = new byte[pos - bodyStart - 1];
+
+ System.arraycopy(workingBuffer, bodyStart, content, 0, content.length);
+
+ break;
+ }
+ }
+ }
+
+ if (content != null)
+ {
+ if (data > pos)
+ {
+ // More data still in the buffer from the next packet
+
+ System.arraycopy(workingBuffer, pos, workingBuffer, 0, data - pos);
+ }
+
+ data = data - pos;
+
+ // reset
+
+ StompFrame ret = new StompFrame(command, headers, content);
+
+ init();
+
+ return ret;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private void throwInvalid() throws StompException
+ {
+ throw new StompException("Invalid STOMP frame: " + this.dumpByteArray(workingBuffer));
+ }
+
+ private void init()
+ {
+ pos = 0;
+
+ command = null;
+
+ headers = new HashMap<String, Object>();
+
+ this.headerBytesCopyStart = -1;
+
+ readingHeaders = true;
+
+ inHeaderName = true;
+
+ headerValueWhitespace = false;
+
+ headerName = null;
+
+ whiteSpaceOnly = true;
+
+ contentLength = -1;
+
+ bodyStart = -1;
+ }
+
+ private void resizeWorking(final int newSize)
+ {
+ byte[] oldBuffer = workingBuffer;
+
+ workingBuffer = new byte[newSize];
+
+ System.arraycopy(oldBuffer, 0, workingBuffer, 0, oldBuffer.length);
+ }
+
+ private boolean tryIncrement(final int length)
+ {
+ if (pos + length >= data)
+ {
+ return false;
+ }
+ else
+ {
+ pos += length;
+
+ return true;
+ }
+ }
+
+ private String dumpByteArray(final byte[] bytes)
+ {
+ StringBuilder str = new StringBuilder();
+
+ for (int i = 0; i < data; i++)
+ {
+ char b = (char)bytes[i];
+
+ if (b == '\n')
+ {
+ str.append("\\n");
+ }
+ else if (b == 0)
+ {
+ str.append("NUL");
+ }
+ else
+ {
+ str.append(b);
+ }
+
+ if (i != bytes.length - 1)
+ {
+ str.append(",");
+ }
+ }
+
+ return str.toString();
+ }
+}
13 years, 6 months
JBoss hornetq SVN: r9746 - in trunk: src/main/org/hornetq/core/protocol/stomp and 2 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-10-05 05:51:27 -0400 (Tue, 05 Oct 2010)
New Revision: 9746
Removed:
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java
Modified:
trunk/docs/user-manual/en/interoperability.xml
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-544
Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml 2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/docs/user-manual/en/interoperability.xml 2010-10-05 09:51:27 UTC (rev 9746)
@@ -57,6 +57,18 @@
When a Stomp client subscribes (or unsubscribes) for a destination (using a <literal>SUBSCRIBE</literal>
or <literal>UNSUBSCRIBE</literal> frame), the destination is mapped to a HornetQ queue.</para>
</section>
+ <section>
+ <title>STOMP and connection-ttl</title>
+ <para>Well behaved STOMP clients will always send a DISCONNECT frame before closing their connections. In this case the server
+ will clear up any server side resources such as sessions and consumers synchronously. However if STOMP clients exit without
+ sending a DISCONNECT frame or if they crash the server will have no way of knowing immediately whether the client is still alive
+ or not. STOMP connections therefore default to a connection-ttl value of 1 minute (see chapter on <link linkend="connection-ttl"
+ >connection-ttl</link> for more information. This value can be overridden using connection-ttl-override.
+ </para>
+ <note><para>Please note that the STOMP protocol does not contain any heartbeat frame. It is therefore the user's responsibility to make sure
+ data is sent within connection-ttl or the server will assume the client is dead and clean up server side resources.</para></note>
+ </section>
+
<section>
<title>Stomp and JMS interoperabilty</title>
<section>
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-10-05 09:51:27 UTC (rev 9746)
@@ -49,6 +49,13 @@
private boolean valid;
private boolean destroyed = false;
+
+ private StompDecoder decoder = new StompDecoder();
+
+ public StompDecoder getDecoder()
+ {
+ return decoder;
+ }
StompConnection(final Connection transportConnection, final StompProtocolManager manager)
{
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-10-05 09:51:27 UTC (rev 9746)
@@ -22,31 +22,40 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.core.logging.Logger;
/**
* Represents all the data in a STOMP frame.
*
* @author <a href="http://hiramchirino.com">chirino</a>
+ * @author Tim Fox
+ *
*/
class StompFrame
{
+ private static final Logger log = Logger.getLogger(StompFrame.class);
+
public static final byte[] NO_DATA = new byte[] {};
+
private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
private final String command;
+
private final Map<String, Object> headers;
+
private final byte[] content;
-
+
private HornetQBuffer buffer = null;
+
private int size;
-
+
public StompFrame(String command, Map<String, Object> headers, byte[] data)
{
this.command = command;
this.headers = headers;
this.content = data;
}
-
+
public StompFrame(String command, Map<String, Object> headers)
{
this.command = command;
@@ -63,7 +72,7 @@
{
return content;
}
-
+
public Map<String, Object> getHeaders()
{
return headers;
@@ -95,7 +104,8 @@
out += new String(content);
return out;
}
-
+
+
public HornetQBuffer toHornetQBuffer() throws Exception
{
if (buffer == null)
Deleted: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java 2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java 2010-10-05 09:51:27 UTC (rev 9746)
@@ -1,209 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.core.protocol.stomp;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
-
-/**
- * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
- */
-class StompFrameDecoder
-{
- private static final Logger log = Logger.getLogger(StompFrameDecoder.class);
-
- private static final int MAX_COMMAND_LENGTH = 1024;
-
- private static final int MAX_HEADER_LENGTH = 1024 * 10;
-
- private static final int MAX_HEADERS = 1000;
-
- private static final int MAX_DATA_LENGTH = 1024 * 1024 * 10;
-
- public StompFrame decode(HornetQBuffer buffer)
- {
- try
- {
- String command = null;
-
- // skip white space to next real action line
- while (true) {
- command = StompFrameDecoder.readLine(buffer, StompFrameDecoder.MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
- if (command == null) {
- return null;
- }
- else {
- command = command.trim();
- if (command.length() > 0) {
- break;
- }
- }
- }
-
- // Parse the headers
- HashMap<String, Object> headers = new HashMap<String, Object>(25);
- while (true)
- {
- String line = StompFrameDecoder.readLine(buffer, StompFrameDecoder.MAX_HEADER_LENGTH, "The maximum header length was exceeded");
- if (line == null)
- {
- return null;
- }
-
- if (headers.size() > StompFrameDecoder.MAX_HEADERS)
- {
- throw new StompException("The maximum number of headers was exceeded", true);
- }
-
- if (line.trim().length() == 0)
- {
- break;
- }
-
- try
- {
- int seperator_index = line.indexOf(Stomp.Headers.SEPARATOR);
- if (seperator_index == -1)
- {
- return null;
- }
- String name = line.substring(0, seperator_index).trim();
- String value = line.substring(seperator_index + 1, line.length()).trim();
- headers.put(name, value);
- }
- catch (Exception e)
- {
- throw new StompException("Unable to parse header line [" + line + "]", true);
- }
- }
- // Read in the data part.
- byte[] data = StompFrame.NO_DATA;
- String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
- if (contentLength != null)
- {
-
- // Bless the client, he's telling us how much data to read in.
- int length;
- try
- {
- length = Integer.parseInt(contentLength.trim());
- }
- catch (NumberFormatException e)
- {
- throw new StompException("Specified content-length is not a valid integer", true);
- }
-
- if (length > StompFrameDecoder.MAX_DATA_LENGTH)
- {
- throw new StompException("The maximum data length was exceeded", true);
- }
-
- if (buffer.readableBytes() < length)
- {
- return null;
- }
-
- data = new byte[length];
- buffer.readBytes(data);
-
- if (!buffer.readable())
- {
- return null;
- }
- if (buffer.readByte() != 0)
- {
- throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " +
- "there was no trailing null byte", true);
- }
- }
- else
- {
- byte[] body = new byte[StompFrameDecoder.MAX_DATA_LENGTH];
- boolean bodyCorrectlyEnded = false;
- int count = 0;
- while (buffer.readable())
- {
- byte b = buffer.readByte();
-
- if (b == (byte)'\0')
- {
- bodyCorrectlyEnded = true;
- break;
- }
- else
- {
- body[count++] = b;
- }
- }
-
- if (!bodyCorrectlyEnded)
- {
- return null;
- }
-
- data = new byte[count];
- System.arraycopy(body, 0, data, 0, count);
- }
-
- return new StompFrame(command, headers, data);
- }
- catch (IOException e)
- {
- log.error("Unable to decode stomp frame", e);
- return null;
- }
- }
-
- private static String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException
- {
- char[] chars = new char[MAX_HEADER_LENGTH];
-
- if (!in.readable())
- {
- return null;
- }
-
- boolean properString = false;
- int count = 0;
- while (in.readable())
- {
- byte b = in.readByte();
-
- if (b == (byte)'\n')
- {
- properString = true;
- break;
- }
- else
- {
- chars[count++] = (char)b;
- }
- }
- if (properString)
- {
- return new String(chars, 0, count);
- }
- else
- {
- return null;
- }
- }
-}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-05 09:51:27 UTC (rev 9746)
@@ -59,8 +59,6 @@
private final HornetQServer server;
- private final StompFrameDecoder frameDecoder;
-
private final Executor executor;
private final Map<String, StompSession> transactedSessions = new HashMap<String, StompSession>();
@@ -105,7 +103,6 @@
public StompProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
{
this.server = server;
- this.frameDecoder = new StompFrameDecoder();
this.executor = server.getExecutorFactory().getExecutor();
}
@@ -115,8 +112,9 @@
{
StompConnection conn = new StompConnection(connection, this);
- //Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection will be timed out and closed!
-
+ // Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
+ // will be timed out and closed!
+
long ttl = server.getConfiguration().getConnectionTTLOverride();
if (ttl != -1)
@@ -127,7 +125,7 @@
{
// Default to 1 minute - which is same as core protocol
return new ConnectionEntry(conn, System.currentTimeMillis(), 1 * 60 * 1000);
- }
+ }
}
public void removeHandler(String name)
@@ -136,121 +134,123 @@
public int isReadyToHandle(HornetQBuffer buffer)
{
- int start = buffer.readerIndex();
+ // This never gets called
- StompFrame frame = frameDecoder.decode(buffer);
-
- if (frame == null)
- {
- return -1;
- }
- else
- {
- return buffer.readerIndex() - start;
- }
+ return -1;
}
public void handleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
{
- try
- {
- doHandleBuffer(connection, buffer);
- }
- finally
- {
- server.getStorageManager().clearContext();
- }
- }
+ StompConnection conn = (StompConnection)connection;
+
+ StompDecoder decoder = conn.getDecoder();
- private void doHandleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
- {
- StompConnection conn = (StompConnection)connection;
- StompFrame request = null;
- try
+ do
{
- request = frameDecoder.decode(buffer);
- if (log.isTraceEnabled())
+ StompFrame request;
+
+ try
{
- log.trace("received " + request);
+ request = decoder.decode(buffer);
}
+ catch (Exception e)
+ {
+ log.error("Failed to decode", e);
- String command = request.getCommand();
- StompFrame response = null;
-
- if (Stomp.Commands.CONNECT.equals(command))
- {
- response = onConnect(request, conn);
+ return;
}
- else if (Stomp.Commands.DISCONNECT.equals(command))
+
+ if (request == null)
{
- response = onDisconnect(request, conn);
+ return;
}
- else if (Stomp.Commands.SEND.equals(command))
+
+ try
{
- response = onSend(request, conn);
- }
- else if (Stomp.Commands.SUBSCRIBE.equals(command))
- {
- response = onSubscribe(request, conn);
- }
- else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
- {
- response = onUnsubscribe(request, conn);
- }
- else if (Stomp.Commands.ACK.equals(command))
- {
- response = onAck(request, conn);
- }
- else if (Stomp.Commands.BEGIN.equals(command))
- {
- response = onBegin(request, server, conn);
- }
- else if (Stomp.Commands.COMMIT.equals(command))
- {
- response = onCommit(request, conn);
- }
- else if (Stomp.Commands.ABORT.equals(command))
- {
- response = onAbort(request, conn);
- }
- else
- {
- log.error("Unsupported Stomp frame: " + request);
- response = new StompFrame(Stomp.Responses.ERROR,
- new HashMap<String, Object>(),
- ("Unsupported frame: " + command).getBytes());
- }
+ String command = request.getCommand();
- if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
- {
- if (response == null)
+ StompFrame response = null;
+
+ if (Stomp.Commands.CONNECT.equals(command))
{
- Map<String, Object> h = new HashMap<String, Object>();
- response = new StompFrame(Stomp.Responses.RECEIPT, h);
+ response = onConnect(request, conn);
}
- response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
- request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
- }
+ else if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ response = onDisconnect(request, conn);
+ }
+ else if (Stomp.Commands.SEND.equals(command))
+ {
+ response = onSend(request, conn);
+ }
+ else if (Stomp.Commands.SUBSCRIBE.equals(command))
+ {
+ response = onSubscribe(request, conn);
+ }
+ else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
+ {
+ response = onUnsubscribe(request, conn);
+ }
+ else if (Stomp.Commands.ACK.equals(command))
+ {
+ response = onAck(request, conn);
+ }
+ else if (Stomp.Commands.BEGIN.equals(command))
+ {
+ response = onBegin(request, server, conn);
+ }
+ else if (Stomp.Commands.COMMIT.equals(command))
+ {
+ response = onCommit(request, conn);
+ }
+ else if (Stomp.Commands.ABORT.equals(command))
+ {
+ response = onAbort(request, conn);
+ }
+ else
+ {
+ log.error("Unsupported Stomp frame: " + request);
+ response = new StompFrame(Stomp.Responses.ERROR,
+ new HashMap<String, Object>(),
+ ("Unsupported frame: " + command).getBytes());
+ }
- if (response != null)
- {
- sendReply(conn, response);
+ if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ log.info("receipt requested");
+ if (response == null)
+ {
+ Map<String, Object> h = new HashMap<String, Object>();
+ response = new StompFrame(Stomp.Responses.RECEIPT, h);
+ }
+ response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
+ request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
+ }
+
+ if (response != null)
+ {
+ sendReply(conn, response);
+ }
+
+ if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ conn.destroy();
+ }
}
-
- if (Stomp.Commands.DISCONNECT.equals(command))
+ catch (Exception e)
{
- conn.destroy();
+ e.printStackTrace();
+ StompFrame error = createError(e, request);
+ if (error != null)
+ {
+ sendReply(conn, error);
+ }
}
- }
- catch (Exception e)
- {
- e.printStackTrace();
- StompFrame error = createError(e, request);
- if (error != null)
+ finally
{
- sendReply(conn, error);
+ server.getStorageManager().clearContext();
}
- }
+ } while (decoder.hasBytes());
}
// Public --------------------------------------------------------
@@ -466,7 +466,8 @@
StompSession stompSession = sessions.get(connection.getID());
if (stompSession == null)
{
- stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
+ stompSession = new StompSession(connection, this, server.getStorageManager()
+ .newContext(server.getExecutorFactory().getExecutor()));
String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name,
connection.getLogin(),
@@ -516,7 +517,7 @@
cleanup(connection);
return null;
}
-
+
private StompFrame onSend(StompFrame frame, StompConnection connection) throws Exception
{
checkConnected(connection);
@@ -554,7 +555,8 @@
{
message.putStringProperty(CONNECTION_ID_PROP, connection.getID().toString());
}
- stompSession.getSession().send(message, true);
+ stompSession.getSession().send(message, true);
+
return null;
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-10-05 09:51:27 UTC (rev 9746)
@@ -93,7 +93,7 @@
HornetQBuffer buffer = serverMessage.getBodyBuffer();
int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
- : serverMessage.getEndOfBodyPosition();
+ : serverMessage.getEndOfBodyPosition();
int size = bodyPos - buffer.readerIndex();
buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
byte[] data = new byte[size];
@@ -108,7 +108,8 @@
if (text != null)
{
data = text.toString().getBytes("UTF-8");
- } else
+ }
+ else
{
data = new byte[0];
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-10-05 09:51:27 UTC (rev 9746)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.client.impl.ClientMessageImpl;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -34,7 +35,10 @@
class StompUtils
{
// Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(StompUtils.class);
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -53,6 +57,7 @@
{
msg.setDurable(Boolean.parseBoolean(persistent));
}
+
// FIXME should use a proper constant
msg.putObjectProperty("JMSCorrelationID", headers.remove(Stomp.Headers.Send.CORRELATION_ID));
msg.putObjectProperty("JMSType", headers.remove(Stomp.Headers.Send.TYPE));
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java 2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java 2010-10-05 09:51:27 UTC (rev 9746)
@@ -38,20 +38,25 @@
*/
public class WebSocketStompFrameEncoder extends OneToOneEncoder
{
-
- private final StompFrameDecoder decoder = new StompFrameDecoder();
-
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception
{
if (msg instanceof ChannelBuffer)
{
+ // FIXME - this is a silly way to do this - a better way to do this would be to create a new protocol, with protocol manager etc
+ // and re-use some of the STOMP codec stuff - Tim
+
+
// this is ugly and slow!
// we have to go ChannelBuffer -> HornetQBuffer -> StompFrame -> String -> WebSocketFrame
// since HornetQ protocol SPI requires to return HornetQBuffer to the transport
HornetQBuffer buffer = new ChannelBufferWrapper((ChannelBuffer)msg);
+
+ StompDecoder decoder = new StompDecoder();
+
StompFrame frame = decoder.decode(buffer);
+
if (frame != null)
{
WebSocketFrame wsFrame = new DefaultWebSocketFrame(frame.asString());
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-10-05 09:51:27 UTC (rev 9746)
@@ -356,8 +356,8 @@
if (protocol == ProtocolType.CORE)
{
// Core protocol uses its own optimised decoder
-
- handlers.put("hornetq-decode", new HornetQFrameDecoder2());
+
+ handlers.put("hornetq-decoder", new HornetQFrameDecoder2());
}
else if (protocol == ProtocolType.STOMP_WS)
{
@@ -367,13 +367,17 @@
handlers.put("hornetq-decoder", new HornetQFrameDecoder(decoder));
handlers.put("websocket-handler", new WebSocketServerHandler());
}
+ else if (protocol == ProtocolType.STOMP)
+ {
+ //With STOMP the decoding is handled in the StompFrame class
+ }
else
{
- handlers.put("hornetq-decoder", new HornetQFrameDecoder(decoder));
+ handlers.put("hornetq-decoder", new HornetQFrameDecoder(decoder));
}
handlers.put("handler", new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
-
+
/**
* STOMP_WS protocol mandates use of named handlers to be able to replace http codecs
* by websocket codecs after handshake.
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-10-05 09:51:27 UTC (rev 9746)
@@ -90,7 +90,7 @@
private JMSServerManager server;
- public void _testSendManyMessages() throws Exception
+ public void testSendManyMessages() throws Exception
{
MessageConsumer consumer = session.createConsumer(queue);
@@ -106,7 +106,7 @@
public void onMessage(Message arg0)
{
- System.out.println("<<< " + (1000 - latch.getCount()));
+ //System.out.println("<<< " + (1000 - latch.getCount()));
latch.countDown();
}
});
@@ -115,12 +115,40 @@
for (int i = 1; i <= count; i++)
{
// Thread.sleep(1);
- System.out.println(">>> " + i);
+ //System.out.println(">>> " + i);
sendFrame(frame);
}
assertTrue(latch.await(60, TimeUnit.SECONDS));
+ }
+
+ public void testPerf() throws Exception
+ {
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ int count = 100000;
+
+ frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "ABCDJIMTEST<GRV>http://techcrunch.com/2010/09/23/thelikestream-digg-for-facebook-likes/<GRV>0" + Stomp.NULL;
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 1; i <= count; i++)
+ {
+ sendFrame(frame);
+
+ if (i % 1000 == 0)
+ {
+ log.info("Sent " + i);
+ }
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("That took " + (end-start));
}
public void testConnect() throws Exception
@@ -185,7 +213,7 @@
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame = "\nSEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
sendFrame(frame);
@@ -199,7 +227,38 @@
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
+
+ /*
+ * Some STOMP clients erroneously put a new line \n *after* the terminating NUL char at the end of the frame
+ * This means next frame read might have a \n a the beginning.
+ * This is contrary to STOMP spec but we deal with it so we can work nicely with crappy STOMP clients
+ */
+ public void testSendMessageWithLeadingNewLine() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL + "\n";
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL + "\n";
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
public void testSendMessageWithReceipt() throws Exception
{
13 years, 6 months
JBoss hornetq SVN: r9745 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 10 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-04 18:20:52 -0400 (Mon, 04 Oct 2010)
New Revision: 9745
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
recovery of cursors
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -16,6 +16,7 @@
import java.util.List;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -60,6 +61,8 @@
boolean page(ServerMessage message) throws Exception;
Page createPage(final int page) throws Exception;
+
+ PageCursorProvider getCursorProvier();
/**
* @return false if a thread was already started, or if not in page mode
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -30,9 +30,24 @@
PagePosition getFirstPosition();
- void ack(PagePosition position);
+ void ack(PagePosition position) throws Exception;
- void ack(long tx, PagePosition position);
+ void ackTx(long tx, PagePosition position) throws Exception;
+
+ /**
+ * @param position
+ */
+ void recoverACK(PagePosition position);
+
+ /**
+ * To be used to avoid a redelivery of a prepared ACK after load
+ * @param position
+ */
+ void recoverPreparedACK(PagePosition position);
+ /**
+ * To be used on redeliveries
+ * @param position
+ */
void returnElement(PagePosition position);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -46,7 +46,7 @@
* @param queueId The cursorID should be the same as the queueId associated for persistance
* @return
*/
- PageCursor createCursor(long queueId);
+ PageCursor getCursor(long queueId);
/**
* Create a non persistent cursor, usually associated with browsing
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -35,11 +35,11 @@
// Attributes ----------------------------------------------------
- private StorageManager store;
+ private final StorageManager store;
private final long cursorId;
- private PagingStore pageStore;
+ private final PagingStore pageStore;
private final PageCursorProvider cursorProvider;
@@ -49,7 +49,10 @@
// Constructors --------------------------------------------------
- public PageCursorImpl(PageCursorProvider cursorProvider, PagingStore pageStore, StorageManager store, long cursorId)
+ public PageCursorImpl(final PageCursorProvider cursorProvider,
+ final PagingStore pageStore,
+ final StorageManager store,
+ final long cursorId)
{
this.pageStore = pageStore;
this.store = store;
@@ -86,22 +89,20 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void ack(PagePosition position)
+ public void ack(final PagePosition position) throws Exception
{
- // TODO Auto-generated method stub
+ store.storeCursorAcknowledge(cursorId, position);
}
-
- public void ack(long tx, PagePosition position)
+
+ public void ackTx(final long tx, final PagePosition position) throws Exception
{
-
+ store.storeCursorAcknowledgeTransactional(tx, cursorId, position);
}
-
-
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
*/
- public void returnElement(PagePosition position)
+ public void returnElement(final PagePosition position)
{
// TODO Auto-generated method stub
@@ -120,7 +121,7 @@
// Protected -----------------------------------------------------
- protected boolean match(ServerMessage message)
+ protected boolean match(final ServerMessage message)
{
return true;
}
@@ -133,6 +134,24 @@
return new PagePositionImpl(firstPage, -1);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#recoverACK(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void recoverACK(final PagePosition position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#recoverPreparedACK(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void recoverPreparedACK(final PagePosition position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor.impl;
import java.util.List;
+import java.util.concurrent.ConcurrentMap;
import org.hornetq.api.core.Pair;
import org.hornetq.core.paging.Page;
@@ -26,9 +27,13 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.SoftValueHashMap;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
/**
* A PageProviderIMpl
+ *
+ * TODO: this may be moved entirely into PagingStore as there's an one-to-one relationship here
+ * However I want to keep this isolated as much as possible during development
*
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
@@ -45,6 +50,8 @@
private final StorageManager storageManager;
private SoftValueHashMap<Long, PageCacheImpl> softCache = new SoftValueHashMap<Long, PageCacheImpl>();
+
+ private ConcurrentMap<Long, PageCursor> activeCursors = new ConcurrentHashMap<Long, PageCursor>();
// Static --------------------------------------------------------
@@ -66,26 +73,23 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
*/
- public PageCursor createCursor(long cursorId)
+ public PageCursor getCursor(long cursorID)
{
- return new PageCursorImpl(this, pagingStore, storageManager, cursorId);
+ PageCursor activeCursor = activeCursors.get(cursorID);
+ if (activeCursor == null)
+ {
+ activeCursor = activeCursors.putIfAbsent(cursorID, new PageCursorImpl(this, pagingStore, storageManager, cursorID));
+ }
+
+ return activeCursor;
}
-
public PageCursor createCursor()
{
return new PageCursorImpl(this, pagingStore, storageManager, 0);
}
/* (non-Javadoc)
- * @see org.hornetq.core.paging.cursor.PageCursorProvider#recoverCursor(org.hornetq.core.paging.cursor.PagePosition)
- */
- public PageCursor recoverCursor(final PagePositionImpl position)
- {
- return null;
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
*/
public Pair<PagePosition, ServerMessage> getAfter(final PagePosition pos) throws Exception
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -13,9 +13,7 @@
package org.hornetq.core.paging.cursor.impl;
-import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.utils.DataConstants;
/**
* A PagePosition
@@ -50,7 +48,7 @@
*/
public PagePositionImpl()
{
-
+
}
/**
@@ -126,4 +124,38 @@
{
return this.pageNr == pos.getPageNr() && this.getRecordID() - pos.getRecordID() == 1;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + messageNr;
+ result = prime * result + (int)(pageNr ^ (pageNr >>> 32));
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PagePositionImpl other = (PagePositionImpl)obj;
+ if (messageNr != other.messageNr)
+ return false;
+ if (pageNr != other.pageNr)
+ return false;
+ return true;
+ }
+
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -40,6 +40,8 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.PagingStoreFactory;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
@@ -48,8 +50,8 @@
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
/**
@@ -114,6 +116,8 @@
/** duplicate cache used at this address */
private final DuplicateIDCache duplicateCache;
+
+ private final PageCursorProvider cursorProvider;
/**
* We need to perform checks on currentPage with minimal locking
@@ -187,6 +191,8 @@
this.storeFactory = storeFactory;
this.syncNonTransactional = syncNonTransactional;
+
+ this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager);
// Post office could be null on the backup node
if (postOffice == null)
@@ -204,6 +210,11 @@
// PagingStore implementation ------------------------------------
+ public PageCursorProvider getCursorProvier()
+ {
+ return cursorProvider;
+ }
+
public long getFirstPage()
{
return firstPageId;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -151,6 +151,7 @@
final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
+ Map<Long, QueueBindingInfo> queueInfos,
final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception;
long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -50,6 +50,8 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.cursor.PageCursor;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
@@ -136,7 +138,7 @@
public static final byte HEURISTIC_COMPLETION = 38;
- public static final byte ACKNOWLEDGE_PAGING = 39;
+ public static final byte ACKNOWLEDGE_CURSOR = 39;
private UUID persistentID;
@@ -517,7 +519,7 @@
{
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
- messageJournal.appendAddRecord(ackID, ACKNOWLEDGE_PAGING, new CursorAckRecordEncoding(queueID, position), syncNonTransactional, getContext(syncNonTransactional));
+ messageJournal.appendAddRecord(ackID, ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position), syncNonTransactional, getContext(syncNonTransactional));
}
@@ -627,7 +629,7 @@
{
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
- messageJournal.appendAddRecordTransactional(txID, ackID, ACKNOWLEDGE_PAGING, new CursorAckRecordEncoding(queueID, position));
+ messageJournal.appendAddRecordTransactional(txID, ackID, ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position));
}
public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
@@ -786,6 +788,7 @@
final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
+ Map<Long, QueueBindingInfo> queueInfos,
final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -1002,6 +1005,29 @@
resourceManager.putHeuristicCompletion(record.id, encoding.xid, encoding.isCommit);
break;
}
+ case ACKNOWLEDGE_CURSOR:
+ {
+ CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+ encoding.decode(buff);
+
+ encoding.position.setRecordID(record.id);
+
+ QueueBindingInfo queueInfo = queueInfos.get(encoding.queueID);
+
+ if (queueInfo != null)
+ {
+ SimpleString address = queueInfo.getAddress();
+ PagingStore store = pagingManager.getPageStore(address);
+ PageCursor cursor = store.getCursorProvier().getCursor(encoding.queueID);
+ cursor.recoverACK(encoding.position);
+ }
+ else
+ {
+ log.warn("Can't find queue " + queueInfo.getId() + " while reloading ACKNOWLEDGE_CURSOR");
+ }
+
+ break;
+ }
default:
{
throw new IllegalStateException("Invalid record type " + recordType);
@@ -1536,6 +1562,12 @@
break;
}
+ case ACKNOWLEDGE_CURSOR:
+ {
+ // TODO: implement and test this case
+ // and make sure the rollback will work well also
+ break;
+ }
default:
{
JournalStorageManager.log.warn("InternalError: Record type " + recordType +
@@ -2229,12 +2261,8 @@
long scheduledDeliveryTime;
int deliveryCount;
-
- boolean referenced = false;
}
-
-
private static final class CursorAckRecordEncoding implements EncodingSupport
{
public CursorAckRecordEncoding(final long queueID, final PagePosition position)
@@ -2242,6 +2270,11 @@
this.queueID = queueID;
this.position = position;
}
+
+ public CursorAckRecordEncoding()
+ {
+ this.position = new PagePositionImpl();
+ }
long queueID;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -268,6 +268,7 @@
final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
+ Map<Long, QueueBindingInfo> queueInfos,
final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
{
return new JournalLoadInformation();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -61,6 +61,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
@@ -163,7 +164,7 @@
private volatile QueueFactory queueFactory;
private volatile PagingManager pagingManager;
-
+
private volatile PostOffice postOffice;
private volatile ExecutorService threadPool;
@@ -1188,9 +1189,12 @@
setNodeID();
Map<Long, Queue> queues = new HashMap<Long, Queue>();
+ Map<Long, QueueBindingInfo> queueBindingInfosMap = new HashMap<Long, QueueBindingInfo>();
for (QueueBindingInfo queueBindingInfo : queueBindingInfos)
{
+ queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
+
Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
@@ -1226,6 +1230,7 @@
pagingManager,
resourceManager,
queues,
+ queueBindingInfosMap,
duplicateIDMap);
for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet())
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -61,6 +61,7 @@
// Public --------------------------------------------------------
+ // Read more cache than what would fit on the memory, and validate if the memory would be cleared through soft-caches
public void testReadCache() throws Exception
{
@@ -132,6 +133,32 @@
assertNull(cache);
}
+
+
+ public void testRollbackScenarios() throws Exception
+ {
+
+ }
+
+ public void testRedeliveryScenarios() throws Exception
+ {
+
+ }
+
+ public void testCleanupScenarios() throws Exception
+ {
+ // Validate the pages are being cleared (with multiple cursors)
+ }
+
+ public void testLeavePageStateAndRestart() throws Exception
+ {
+ // Validate the cursor are working fine when all the pages are gone, and then paging being restarted
+ }
+
+ public void testRedeliveryWithCleanup() throws Exception
+ {
+
+ }
/**
* @param numMessages
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -76,7 +76,7 @@
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
- journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null);
+ journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null);
assertEquals(98, deletedMessage.size());
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -101,7 +101,7 @@
Map<Long, Queue> queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(postOffice, null, null, queues, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null, null);
journal.stop();
@@ -111,7 +111,7 @@
queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(postOffice, null, null, queues, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null, null);
queueBindingInfos = new ArrayList<QueueBindingInfo>();
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -128,7 +128,7 @@
Map<Long, Queue> queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null);
+ journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null);
}
/**
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -1154,6 +1154,7 @@
final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
+ Map<Long, QueueBindingInfo> queueInfos,
final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
{
return new JournalLoadInformation();
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-10-04 19:35:39 UTC (rev 9744)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2010-10-04 22:20:52 UTC (rev 9745)
@@ -111,6 +111,7 @@
new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
+ null,
mapDups);
Assert.assertEquals(0, mapDups.size());
@@ -132,6 +133,7 @@
new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
+ null,
mapDups);
Assert.assertEquals(1, mapDups.size());
@@ -160,6 +162,7 @@
new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
+ null,
mapDups);
Assert.assertEquals(1, mapDups.size());
13 years, 6 months
JBoss hornetq SVN: r9744 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-04 15:35:39 -0400 (Mon, 04 Oct 2010)
New Revision: 9744
Removed:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Storage of cursors' ack initial implementation
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-04 19:35:39 UTC (rev 9744)
@@ -32,5 +32,7 @@
void ack(PagePosition position);
+ void ack(long tx, PagePosition position);
+
void returnElement(PagePosition position);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-04 19:35:39 UTC (rev 9744)
@@ -41,6 +41,17 @@
PagingStore getAssociatedStore();
+ /**
+ *
+ * @param queueId The cursorID should be the same as the queueId associated for persistance
+ * @return
+ */
+ PageCursor createCursor(long queueId);
+
+ /**
+ * Create a non persistent cursor, usually associated with browsing
+ * @return
+ */
PageCursor createCursor();
// PageCursor recoverCursor(PagePosition position);
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2010-10-04 19:35:39 UTC (rev 9744)
@@ -13,6 +13,7 @@
package org.hornetq.core.paging.cursor;
+
/**
* A PagePosition
*
@@ -23,9 +24,10 @@
public interface PagePosition extends Comparable<PagePosition>
{
+ // The recordID associated during ack
long getRecordID();
- // TODO: this belongs somewhere else
+ // The recordID associated during ack
void setRecordID(long recordID);
long getPageNr();
Deleted: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java 2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/StorageCursor.java 2010-10-04 19:35:39 UTC (rev 9744)
@@ -1,28 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.paging.cursor;
-
-import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
-
-/**
- * A StorageCursor
- *
- * @author clebertsuconic
- *
- *
- */
-public interface StorageCursor
-{
- void storeCursorInitialPosition(PagePositionImpl position);
-}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-04 19:35:39 UTC (rev 9744)
@@ -18,7 +18,7 @@
import org.hornetq.core.paging.cursor.PageCursor;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.paging.cursor.StorageCursor;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
/**
@@ -35,10 +35,12 @@
// Attributes ----------------------------------------------------
- private StorageCursor store;
+ private StorageManager store;
+ private final long cursorId;
+
private PagingStore pageStore;
-
+
private final PageCursorProvider cursorProvider;
private volatile PagePosition lastPosition;
@@ -47,11 +49,12 @@
// Constructors --------------------------------------------------
- public PageCursorImpl(PageCursorProvider cursorProvider, PagingStore pageStore, StorageCursor store)
+ public PageCursorImpl(PageCursorProvider cursorProvider, PagingStore pageStore, StorageManager store, long cursorId)
{
this.pageStore = pageStore;
this.store = store;
this.cursorProvider = cursorProvider;
+ this.cursorId = cursorId;
}
// Public --------------------------------------------------------
@@ -65,15 +68,15 @@
{
lastPosition = recoverLastPosition();
}
-
- Pair<PagePosition,ServerMessage> message = null;
+
+ Pair<PagePosition, ServerMessage> message = null;
do
{
- message = cursorProvider.getAfter(lastPosition);
- if (message != null)
- {
- lastPosition = message.a;
- }
+ message = cursorProvider.getAfter(lastPosition);
+ if (message != null)
+ {
+ lastPosition = message.a;
+ }
}
while (message != null && !match(message.b));
@@ -86,8 +89,14 @@
public void ack(PagePosition position)
{
// TODO Auto-generated method stub
-
}
+
+ public void ack(long tx, PagePosition position)
+ {
+
+ }
+
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
@@ -110,7 +119,7 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+
protected boolean match(ServerMessage message)
{
return true;
@@ -123,7 +132,7 @@
long firstPage = pageStore.getFirstPage();
return new PagePositionImpl(firstPage, -1);
}
-
+
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-04 19:35:39 UTC (rev 9744)
@@ -66,9 +66,15 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#createCursor()
*/
+ public PageCursor createCursor(long cursorId)
+ {
+ return new PageCursorImpl(this, pagingStore, storageManager, cursorId);
+ }
+
+
public PageCursor createCursor()
{
- return new PageCursorImpl(this, pagingStore, null);
+ return new PageCursorImpl(this, pagingStore, storageManager, 0);
}
/* (non-Javadoc)
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-04 19:35:39 UTC (rev 9744)
@@ -13,7 +13,9 @@
package org.hornetq.core.paging.cursor.impl;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.utils.DataConstants;
/**
* A PagePosition
@@ -43,6 +45,15 @@
}
/**
+ * @param pageNr
+ * @param messageNr
+ */
+ public PagePositionImpl()
+ {
+
+ }
+
+ /**
* @return the recordID
*/
public long getRecordID()
@@ -115,5 +126,4 @@
{
return this.pageNr == pos.getPageNr() && this.getRecordID() - pos.getRecordID() == 1;
}
-
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java 2010-10-04 19:35:39 UTC (rev 9744)
@@ -13,7 +13,6 @@
package org.hornetq.core.persistence;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -27,6 +26,7 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.postoffice.Binding;
@@ -98,6 +98,8 @@
void deleteMessage(long messageID) throws Exception;
void storeAcknowledge(long queueID, long messageID) throws Exception;
+
+ void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception;
void updateDeliveryCount(MessageReference ref) throws Exception;
@@ -113,6 +115,8 @@
void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception;
+ void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception;
+
void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception;
void deleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-04 19:35:39 UTC (rev 9744)
@@ -50,6 +50,8 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.OperationContext;
@@ -134,6 +136,8 @@
public static final byte HEURISTIC_COMPLETION = 38;
+ public static final byte ACKNOWLEDGE_PAGING = 39;
+
private UUID persistentID;
private final BatchingIDGenerator idGenerator;
@@ -508,7 +512,16 @@
syncNonTransactional,
getContext(syncNonTransactional));
}
+
+ public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
+ {
+ long ackID = idGenerator.generateID();
+ position.setRecordID(ackID);
+ messageJournal.appendAddRecord(ackID, ACKNOWLEDGE_PAGING, new CursorAckRecordEncoding(queueID, position), syncNonTransactional, getContext(syncNonTransactional));
+ }
+
+
public void deleteMessage(final long messageID) throws Exception
{
// Messages are deleted on postACK, one after another.
@@ -607,6 +620,16 @@
new RefEncoding(queueID));
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledgeTransactional(long, long, org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception
+ {
+ long ackID = idGenerator.generateID();
+ position.setRecordID(ackID);
+ messageJournal.appendAddRecordTransactional(txID, ackID, ACKNOWLEDGE_PAGING, new CursorAckRecordEncoding(queueID, position));
+ }
+
public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
{
long id = generateUniqueID();
@@ -2192,6 +2215,7 @@
}
}
+
private static final class AddMessageRecord
{
@@ -2209,6 +2233,50 @@
boolean referenced = false;
}
+
+
+ private static final class CursorAckRecordEncoding implements EncodingSupport
+ {
+ public CursorAckRecordEncoding(final long queueID, final PagePosition position)
+ {
+ this.queueID = queueID;
+ this.position = position;
+ }
+
+ long queueID;
+
+ PagePosition position;
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeLong(queueID);
+ buffer.writeLong(position.getPageNr());
+ buffer.writeInt(position.getMessageNr());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ queueID = buffer.readLong();
+ long pageNR = buffer.readLong();
+ int messageNR = buffer.readInt();
+ this.position = new PagePositionImpl(pageNR, messageNR);
+ }
+ }
+
private class LargeMessageTXFailureCallback implements TransactionFailureCallback
{
private final Map<Long, ServerMessage> messages;
@@ -2245,5 +2313,4 @@
}
}
-
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-10-04 19:35:39 UTC (rev 9744)
@@ -31,6 +31,7 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
@@ -450,4 +451,22 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledge(long, org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void storeCursorAcknowledge(long queueID, PagePosition position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledgeTransactional(long, long, org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-04 16:14:36 UTC (rev 9743)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-04 19:35:39 UTC (rev 9744)
@@ -46,6 +46,7 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.PagingStoreFactory;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
@@ -1501,6 +1502,24 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledge(long, org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void storeCursorAcknowledge(long queueID, PagePosition position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledgeTransactional(long, long, org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
13 years, 6 months