[jboss-cvs] JBoss Messaging SVN: r2449 - in trunk: src/etc/server/default/deploy and 13 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Feb 26 14:25:34 EST 2007
Author: timfox
Date: 2007-02-26 14:25:34 -0500 (Mon, 26 Feb 2007)
New Revision: 2449
Added:
trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
trunk/tests/src/org/jboss/test/messaging/jms/manual/FailoverTest.java
Removed:
trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java
Modified:
trunk/src/etc/aop-messaging-client.xml
trunk/src/etc/server/default/deploy/clustered-db2-persistence-service.xml
trunk/src/etc/server/default/deploy/clustered-mssql-persistence-service.xml
trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
trunk/src/etc/server/default/deploy/clustered-oracle-persistence-service.xml
trunk/src/etc/server/default/deploy/clustered-postgresql-persistence-service.xml
trunk/src/etc/server/default/deploy/clustered-sybase-persistence-service.xml
trunk/src/etc/server/default/deploy/db2-persistence-service.xml
trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
trunk/src/main/org/jboss/jms/client/FailoverValve2.java
trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
Fixed consumer.close() and queue merging bugs
Modified: trunk/src/etc/aop-messaging-client.xml
===================================================================
--- trunk/src/etc/aop-messaging-client.xml 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/aop-messaging-client.xml 2007-02-26 19:25:34 UTC (rev 2449)
@@ -49,13 +49,16 @@
<interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
- </bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->start())">
+ </bind>
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->$implementing{org.jboss.jms.server.endpoint.ConnectionEndpoint}(..))">
+ <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
+ </bind>
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->start())">
<advice name="handleStart" aspect="org.jboss.jms.client.container.ConnectionAspect"/>
- </bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->stop())">
- <advice name="handleStop" aspect="org.jboss.jms.client.container.ConnectionAspect"/>
- </bind>
+ </bind>
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->stop())">
+ <advice name="handleStop" aspect="org.jboss.jms.client.container.ConnectionAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->createConnectionConsumer(..))">
<advice name="handleCreateConnectionConsumer" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
@@ -89,9 +92,6 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->createSessionDelegate(..))">
<advice name="handleCreateSessionDelegate" aspect="org.jboss.jms.client.container.StateCreationAspect"/>
</bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->$implementing{org.jboss.jms.server.endpoint.ConnectionEndpoint}(..))">
- <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- </bind>
<!--
Session Stack
@@ -100,8 +100,11 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->$implementing{org.jboss.jms.delegate.SessionDelegate}(..))">
<interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
- <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
+ <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
</bind>
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->$implementing{org.jboss.jms.server.endpoint.SessionEndpoint}(..))">
+ <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createMessage())">
<advice name="handleCreateMessage" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
@@ -183,9 +186,6 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createBrowserDelegate(..))">
<advice name="handleCreateBrowserDelegate" aspect="org.jboss.jms.client.container.StateCreationAspect"/>
</bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->$implementing{org.jboss.jms.server.endpoint.SessionEndpoint}(..))">
- <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- </bind>
<!--
Consumer Stack
@@ -195,7 +195,10 @@
<interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
- </bind>
+ </bind>
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->$implementing{org.jboss.jms.server.endpoint.ConsumerEndpoint}(..))">
+ <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getMessageListener())">
<advice name="handleGetMessageListener" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
@@ -219,10 +222,7 @@
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getMessageSelector())">
<advice name="handleGetMessageSelector" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
- </bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->$implementing{org.jboss.jms.server.endpoint.ConsumerEndpoint}(..))">
- <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- </bind>
+ </bind>
<!--
@@ -287,14 +287,14 @@
<interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
+ </bind>
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->$implementing{org.jboss.jms.server.endpoint.BrowserEndpoint}(..))">
+ <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->nextMessage())">
<advice name="handleNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->hasNextMessage())">
<advice name="handleHasNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
- </bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->$implementing{org.jboss.jms.server.endpoint.BrowserEndpoint}(..))">
- <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>
- </bind>
+ </bind>
</aop>
\ No newline at end of file
Modified: trunk/src/etc/server/default/deploy/clustered-db2-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-db2-persistence-service.xml 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/clustered-db2-persistence-service.xml 2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+ LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
Modified: trunk/src/etc/server/default/deploy/clustered-mssql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mssql-persistence-service.xml 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/clustered-mssql-persistence-service.xml 2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+ LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
Modified: trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+ LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
Modified: trunk/src/etc/server/default/deploy/clustered-oracle-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-oracle-persistence-service.xml 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/clustered-oracle-persistence-service.xml 2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+ LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
Modified: trunk/src/etc/server/default/deploy/clustered-postgresql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-postgresql-persistence-service.xml 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/clustered-postgresql-persistence-service.xml 2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+ LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
Modified: trunk/src/etc/server/default/deploy/clustered-sybase-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-sybase-persistence-service.xml 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/clustered-sybase-persistence-service.xml 2007-02-26 19:25:34 UTC (rev 2449)
@@ -39,6 +39,7 @@
ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+ LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
Modified: trunk/src/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/db2-persistence-service.xml 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/db2-persistence-service.xml 2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+ LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
Modified: trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+ LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+ LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
Modified: trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+ LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
Modified: trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+ LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
Modified: trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-02-26 19:25:34 UTC (rev 2449)
@@ -39,6 +39,7 @@
ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+ LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -85,8 +85,6 @@
valve.close();
- log.debug(this + " starting client-side failover");
-
synchronized(this)
{
// testing for failed connection and setting the failed flag need to be done in one
@@ -102,6 +100,9 @@
remotingConnection.setFailed();
}
+ //Note - failover doesn't occur until _after_ the above check - so the next comment belongs here
+ log.debug(this + " starting client-side failover");
+
// generate a FAILOVER_STARTED event. The event must be broadcasted AFTER valve closure,
// to insure the client-side stack is in a deterministic state
broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_STARTED, this));
Modified: trunk/src/main/org/jboss/jms/client/FailoverValve2.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverValve2.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/FailoverValve2.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -39,16 +39,30 @@
private static final Logger log = Logger.getLogger(FailoverValve2.class);
+
// Static ---------------------------------------------------------------------------------------
private static boolean trace = log.isTraceEnabled();
// Attributes -----------------------------------------------------------------------------------
+ // Only use this in trace mode
+ private Set threads;
+
private int count;
+
private boolean locked;
- private Set threads = new HashSet();
+ public FailoverValve2()
+ {
+ trace = log.isTraceEnabled();
+
+ if (trace)
+ {
+ threads = new HashSet();
+ }
+ }
+
// Constructors ---------------------------------------------------------------------------------
// Public ---------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -171,9 +171,15 @@
return invocation.invokeNext();
}
finally
- {
+ {
if (isClosing)
{
+ //We make sure we remove ourself AFTER the invocation has been made
+ //otherwise in a failover situation we would end up divorced from the hierarchy
+ //and failover will not occur properly since failover would not be able to
+ //traverse the hierarchy and update the delegates properly
+ removeSelf(invocation);
+
closing();
}
else if (isClose)
@@ -297,11 +303,20 @@
}
}
}
-
- // Remove from the parent
+ }
+
+ /**
+ * Remove from parent
+ *
+ * @param invocation the invocation
+ */
+ protected void removeSelf(Invocation invocation)
+ {
+ HierarchicalState state = ((DelegateSupport)invocation.getTargetObject()).getState();
+
HierarchicalState parent = state.getParent();
if (parent != null)
- {
+ {
parent.getChildren().remove(state);
}
}
Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -34,6 +34,7 @@
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.util.MessageQueueNameHelper;
+import org.jboss.logging.Logger;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -53,6 +54,8 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ConsumerAspect.class);
+
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
@@ -114,11 +117,21 @@
{
ConsumerState consumerState = getState(invocation);
-
// First we call close on the messagecallbackhandler which waits for onMessage invocations
// to complete any further messages received will be ignored
consumerState.getMessageCallbackHandler().close();
+
+ // Then we make sure closing is called on the ServerConsumerEndpoint.
+
+ Object res = invocation.invokeNext();
+ //Now we send a message to the server consumer with the last delivery id so
+ //it can cancel any inflight messages after that
+ //This needs to be done *after* the call to closing has been executed on the server
+ //maybe it can be combined with closing
+
+ ConsumerDelegate del = (ConsumerDelegate)invocation.getTargetObject();
+
long lastDeliveryId = consumerState.getMessageCallbackHandler().getLastDeliveryId();
SessionState sessionState = (SessionState)consumerState.getParent();
@@ -128,17 +141,6 @@
CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
cm.unregisterHandler(consumerState.getConsumerID());
-
- // Then we make sure closing is called on the ServerConsumerEndpoint.
-
- Object res = invocation.invokeNext();
-
- //Now we send a message to the server consumer with the last delivery id so
- //it can cancel any inflight messages after that
- //This needs to be done *after* the call to closing has been executed on the server
- //maybe it can be combined with closing
-
- ConsumerDelegate del = (ConsumerDelegate)invocation.getTargetObject();
//Now we need to cancel any inflight messages - this must be done before
//cancelling the message callback handler buffer, so that messages end up back in the channel
Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -150,7 +150,7 @@
// Protected ------------------------------------------------------------------------------------
protected Object doInvoke(Client client, RequestSupport req) throws JMSException
- {
+ {
return doInvoke(client, req, false);
}
@@ -177,6 +177,7 @@
}
else
{
+
if (trace) { log.trace(this + " invoking " + req + " synchronously on server using " + client); }
resp = client.invoke(req);
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -241,7 +241,7 @@
//TODO - we temporarily need to execute on a different thread to avoid a deadlock situation in
// failover where a message is sent then the valve is locked, and the message send cause
// a message delivery back to the same client which tries to ack but can't get through
- // the valve.
+ // the valve. This won't be necessary when we move to a non blocking transport
this.sessionExecutor.execute(
new Runnable()
{
@@ -259,7 +259,7 @@
});
}
- public void handleMessageInternal(Object message) throws Exception
+ private void handleMessageInternal(Object message) throws Exception
{
MessageProxy proxy = (MessageProxy) message;
Modified: trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConnectionState.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/state/ConnectionState.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -153,12 +153,13 @@
idGenerator = newState.idGenerator;
serverID = newState.serverID;
versionToUse = newState.versionToUse;
-
+
ConnectionDelegate newDelegate = (ConnectionDelegate)newState.getDelegate();
-
+
for(Iterator i = getChildren().iterator(); i.hasNext(); )
- {
+ {
SessionState sessionState = (SessionState)i.next();
+
ClientSessionDelegate sessionDelegate = (ClientSessionDelegate)sessionState.getDelegate();
// create a new session on the new connection for each session on the old connection
Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -526,8 +526,10 @@
firstPagingOrder = ili.getMinPageOrdering().longValue();
nextPagingOrder = ili.getMaxPageOrdering().longValue() + 1;
+
+ paging = true;
- paging = true;
+ log.info("set paging to true");
}
else
{
Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -1017,9 +1017,9 @@
}
}
- public InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int fullSize) throws Exception
+ public InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int numberToLoad, long firstPagingOrder, long nextPagingOrder) throws Exception
{
- if (trace) { log.trace("Merging channel from " + fromChannelID + " to " + toChannelID); }
+ if (trace) { log.trace("Merging channel from " + fromChannelID + " to " + toChannelID + " numberToLoad:" + numberToLoad + " firstPagingOrder:" + firstPagingOrder + " nextPagingOrder:" + nextPagingOrder); }
Connection conn = null;
PreparedStatement ps = null;
@@ -1031,96 +1031,139 @@
{
conn = ds.getConnection();
- // First swap the channel id
+ /*
+ * If channel is paging and has full size f
+ *
+ * then we don't need to load any refs but we need to:
+ *
+ * make sure the page ord is correct across the old paged and new refs
+ *
+ * we know the max page ord (from the channel) for the old refs so we just need to:
+ *
+ * 1) Iterate through the failed channel and update page_ord = max + 1, max + 2 etc
+ *
+ * 2) update channel id
+ *
+ *
+ * If channel is not paging and the total refs before and after <=f
+ *
+ * 1) Load all refs from failed channel
+ *
+ * 2) Update channel id
+ *
+ * return those refs
+ *
+ *
+ * If channel is not paging but total new refs > f
+ *
+ * 1) Iterate through failed channel refs and take the first x to make the channel full
+ *
+ * 2) Update the others with page_ord starting at zero
+ *
+ * 3) Update channel id
+ *
+ * In general:
+ *
+ * We have number to load n, max page size p
+ *
+ * 1) Iterate through failed channel refs in page_ord order
+ *
+ * 2) Put the first n in a List.
+ *
+ * 3) Initialise page_ord_count to be p or 0 depending on whether it was specified
+ *
+ * 4) Update the page_ord of the remaining refs accordiningly
+ *
+ * 5) Update the channel id
+ *
+ */
- ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+ //First load the refs from the failed channel
+
+ List refs = new ArrayList();
- ps.setLong(1, toChannelID);
+ ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
- ps.setLong(2, fromChannelID);
-
- int rows = updateWithRetry(ps);
-
- if (trace) { log.trace("Update channel id updated " + rows + " rows"); }
-
- //Now set any pages refs to not paged
-
- ps = conn.prepareStatement(getSQLStatement("UPDATE_REFS_NOT_PAGED"));
+ ps.setLong(1, fromChannelID);
- ps.setLong(1, 0);
-
- ps.setLong(2, Integer.MAX_VALUE);
-
- ps.setLong(3, toChannelID);
-
- rows = updateWithRetry(ps);
-
- if (trace) { log.trace(" Set paged refs updated " + rows + " rows"); }
-
- //Now load the refs
-
- ps = conn.prepareStatement(getSQLStatement("LOAD_UNPAGED_REFS"));
-
- ps.setLong(1, toChannelID);
-
rs = ps.executeQuery();
- List refs = new ArrayList();
-
int count = 0;
- int pageOrd = 0;
boolean arePaged = false;
+ long pageOrd = nextPagingOrder;
+
while (rs.next())
{
long msgId = rs.getLong(1);
int deliveryCount = rs.getInt(2);
long sched = rs.getLong(3);
- ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
+ if (count < numberToLoad)
+ {
+ ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
+
+ refs.add(ri);
+ }
- if (count < fullSize)
+ // Set page ord
+
+ if (ps2 == null)
{
- refs.add(ri);
- }
- else
+ ps2 = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+ }
+
+ if (count < numberToLoad)
{
- //These ones need to be made paged
-
- if (ps2 == null)
- {
- ps2 = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
-
- ps2.setLong(1, pageOrd);
-
- ps2.setLong(2, msgId);
-
- ps2.setLong(3, toChannelID);
-
- rows = updateWithRetry(ps2);
-
- if (trace) { log.trace("Update page ord updated " + rows + " rows"); }
-
- pageOrd++;
-
- arePaged = true;
-
- }
+ ps2.setNull(1, Types.BIGINT);
+
+ if (trace) { log.trace("Set page ord to null"); }
}
+ else
+ {
+ ps2.setLong(1, pageOrd);
+
+ if (trace) { log.trace("Set page ord to " + pageOrd); }
+
+ arePaged = true;
+
+ pageOrd++;
+ }
- count++;
+ ps2.setLong(2, msgId);
+
+ ps2.setLong(3, fromChannelID);
+
+ int rows = updateWithRetry(ps2);
+
+ if (trace) { log.trace("Update page ord updated " + rows + " rows"); }
+
+ count++;
}
+ ps.close();
+
+ // Now swap the channel id
+
+ ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+
+ ps.setLong(1, toChannelID);
+
+ ps.setLong(2, fromChannelID);
+
+ int rows = updateWithRetry(ps);
+
+ if (trace) { log.trace("Update channel id updated " + rows + " rows"); }
+
if (arePaged)
- {
- return new InitialLoadInfo(new Long(0), new Long(pageOrd - 1), refs);
+ {
+ return new InitialLoadInfo(new Long(firstPagingOrder), new Long(pageOrd - 1), refs);
}
else
{
return new InitialLoadInfo(null, null, refs);
- }
-
+ }
}
catch (Exception e)
{
@@ -3406,6 +3449,9 @@
map.put("LOAD_UNPAGED_REFS",
"SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' " +
"AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD");
+ map.put("LOAD_REFS",
+ "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' " +
+ "AND CHANNEL_ID = ? ORDER BY ORD");
map.put("UPDATE_REFS_NOT_PAGED", "UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?");
map.put("SELECT_MIN_MAX_PAGE_ORD", "SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?");
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -70,7 +70,7 @@
InitialLoadInfo loadFromStart(long channelID, int fullSize) throws Exception;
- InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int fullSize) throws Exception;
+ InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int numberToLoad, long firstPagingOrder, long nextPagingOrder) throws Exception;
List getMessages(List messageIds) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -233,15 +233,24 @@
{
if (trace) { log.trace("Merging queue " + remoteQueue + " into " + this); }
+ log.info("queue is paging:" + this.paging + " message refs size " +
+ this.messageRefs.size() + " fullsize:" + this.fullSize +
+ " delivering:" + this.deliveringCount.get());
+
synchronized (refLock)
{
flushDownCache();
PersistenceManager.InitialLoadInfo ili =
- pm.mergeAndLoad(remoteQueue.getChannelID(), this.channelID, fullSize - messageRefs.size());
+ pm.mergeAndLoad(remoteQueue.getChannelID(), channelID, fullSize - messageRefs.size(), firstPagingOrder, nextPagingOrder);
- doLoad(ili);
+ if (trace) { log.trace("Loaded " + ili.getRefInfos().size() + " refs"); }
+
+ log.info("firstpageord:" + ili.getMinPageOrdering() + " lastpageord:" + ili.getMaxPageOrdering());
+ doLoad(ili);
+
+ deliverInternal();
}
}
Deleted: trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -1,172 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms;
-
-import java.util.Hashtable;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.InitialContext;
-
-import junit.framework.TestCase;
-
-import org.jboss.logging.Logger;
-
-public class FailoverTest extends TestCase
-{
- private static final Logger log = Logger.getLogger(FailoverTest.class);
-
-
- public FailoverTest(String name)
- {
- super(name);
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- public void testNOOP()
- {
-
- }
-
-// public void testSendReceive() throws Exception
-// {
-// Hashtable properties = new Hashtable();
-//
-// properties.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
-//
-// properties.put("java.naming.provider.url", "jnp://192.168.1.11:1199");
-//
-// properties.put("java.naming.factory.url", "org.jnp.interfaces");
-//
-// log.info("Creaing ic");
-//
-// InitialContext ic = new InitialContext(properties);
-//
-// log.info("************ REMOTE");
-//
-// Connection conn = null;
-//
-// try
-// {
-// log.info("Created ic");
-//
-// Queue queue = (Queue)ic.lookup("/queue/testDistributedQueue");
-//
-// log.info("Looked up queue");
-//
-// ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
-//
-// log.info("Looked up cf");
-//
-// conn = cf.createConnection();
-//
-// Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// Session sessCons = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageConsumer cons = sessCons.createConsumer(queue);
-//
-// MessageListener list = new MyListener();
-//
-// cons.setMessageListener(list);
-//
-// conn.start();
-//
-// MessageProducer prod = sessSend.createProducer(queue);
-//
-// prod.setDeliveryMode(DeliveryMode.PERSISTENT);
-//
-// int count = 0;
-//
-// while (true)
-// {
-// TextMessage tm = sessSend.createTextMessage("message " + count);
-//
-// prod.send(tm);
-//
-// log.info("sent " + count);
-//
-// count++;
-//
-// //Thread.sleep(250);
-// }
-//
-//
-// }
-// catch (Exception e)
-// {
-// log.error("Failed", e);
-// throw e;
-// }
-// finally
-// {
-//// if (conn != null)
-//// {
-//// log.info("closing connetion");
-//// try
-//// {
-//// conn.close();
-//// }
-//// catch (Exception ignore)
-//// {
-//// }
-//// log.info("closed connection");
-//// }
-// }
-// }
-
- class MyListener implements MessageListener
- {
-
- public void onMessage(Message msg)
- {
- try
- {
- TextMessage tm = (TextMessage)msg;
-
- log.info("Received message " + tm.getText());
- }
- catch (Exception e)
- {
- log.error("Failed to receive", e);
- }
- }
-
- }
-}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -1895,218 +1895,8 @@
}
}
- public void testMergeQueue() throws Exception
- {
- Connection conn0 = null;
- Connection conn1 = null;
+
- int numberOfMessagesReceived = 0;
-
- try
- {
-
- // Objects Server0
- conn0 = cf.createConnection();
-
- assertEquals(0, ((JBossConnection)conn0).getServerID());
-
- Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
-
- conn0.start();
-
- MessageProducer producer0 = session0.createProducer(queue[0]);
-
- producer0.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- MessageConsumer consumer0 = session0.createConsumer(queue[0]);
-
- for (int i=0; i<10; i++)
- {
- producer0.send(session0.createTextMessage("message " + i));
- }
-
- session0.commit();
-
- TextMessage msg;
-
- do
- {
- msg = (TextMessage)consumer0.receive(5000);
- if (msg!=null)
- {
- log.info("msg = " + msg.getText());
- numberOfMessagesReceived++;
- }
- if (numberOfMessagesReceived==5)
- {
- break;
- }
- } while (msg!=null);
-
- session0.commit();
- consumer0.close();
-
-
- // Objects Server1
- conn1 = cf.createConnection();
-
- assertEquals(1, ((JBossConnection)conn1).getServerID());
-
- conn1.start();
-
- Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer producer1 = session1.createProducer(this.queue[1]);
-
- producer1.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- for (int i=10; i<20; i++)
- {
- producer1.send(session0.createTextMessage("message " + i));
- }
-
- ServerManagement.killAndWait(1);
-
- consumer0 = session0.createConsumer(queue[0]);
- do
- {
- msg = (TextMessage)consumer0.receive(5000);
- if (msg!=null)
- {
- log.info("msg = " + msg.getText());
- numberOfMessagesReceived++;
- }
- } while (msg!=null);
-
- session0.commit();
-
- assertEquals(20, numberOfMessagesReceived);
- }
- finally
- {
- if (conn0!=null)
- {
- conn0.close();
- }
-
- if (conn1!=null)
- {
- conn1.close();
- }
- }
- }
-
- public void testMergeQueue2() throws Exception
- {
- Connection conn0 = null;
- Connection conn1 = null;
-
- int numberOfMessagesReceived = 0;
-
- try
- {
-
- // Objects Server0
- conn0 = cf.createConnection();
-
- assertEquals(0, ((JBossConnection)conn0).getServerID());
-
- Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
-
- conn0.start();
-
- MessageProducer producer0 = session0.createProducer(queue[0]);
-
- producer0.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- MessageConsumer consumer0 = session0.createConsumer(queue[0]);
-
- for (int i=0; i<10; i++)
- {
- producer0.send(session0.createTextMessage("message " + i));
- }
-
- session0.commit();
-
- TextMessage msg;
-
- do
- {
- msg = (TextMessage)consumer0.receive(5000);
- if (msg!=null)
- {
- log.info("msg = " + msg.getText());
- numberOfMessagesReceived++;
- }
- if (numberOfMessagesReceived==5)
- {
- break;
- }
- } while (msg!=null);
-
- session0.commit();
- consumer0.close();
-
-
- // Objects Server1
- conn1 = cf.createConnection();
-
- assertEquals(1, ((JBossConnection)conn1).getServerID());
-
- conn1.start();
-
- Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
-
- MessageProducer producer1 = session1.createProducer(this.queue[1]);
-
- producer1.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- for (int i=10; i<20; i++)
- {
- producer1.send(session0.createTextMessage("message " + i));
- }
-
- session1.commit();
-
- MessageConsumer consumer1 = session1.createConsumer(queue[0]);
-
- ServerManagement.killAndWait(1);
-
- TextMessage msgAfterKill = (TextMessage)consumer1.receive(1000);
- assertNotNull(msgAfterKill);
- session1.commit();
-
- consumer1.close();
-
- consumer0 = session0.createConsumer(queue[0]);
- do
- {
- msg = (TextMessage)consumer0.receive(5000);
- if (msg!=null)
- {
- log.info("msg = " + msg.getText());
- numberOfMessagesReceived++;
- }
- } while (msg!=null);
-
- session0.commit();
-
- assertEquals(19, numberOfMessagesReceived);
- }
- finally
- {
- if (conn0!=null)
- {
- conn0.close();
- }
-
- if (conn1!=null)
- {
- conn1.close();
- }
- }
- }
-
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Added: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -0,0 +1,689 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+
+/**
+ *
+ * A MergeQueueTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MergeQueueTest extends ClusteringTestBase
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public MergeQueueTest(String name)
+ {
+ super(name);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public void testMergeQueue() throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+
+ int numberOfMessagesReceived = 0;
+
+ try
+ {
+
+ // Objects Server0
+ conn0 = cf.createConnection();
+
+ assertEquals(0, ((JBossConnection)conn0).getServerID());
+
+ Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
+
+ conn0.start();
+
+ MessageProducer producer0 = session0.createProducer(queue[0]);
+
+ producer0.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ MessageConsumer consumer0 = session0.createConsumer(queue[0]);
+
+ for (int i=0; i<10; i++)
+ {
+ producer0.send(session0.createTextMessage("message " + i));
+ }
+
+ session0.commit();
+
+ TextMessage msg;
+
+ do
+ {
+ msg = (TextMessage)consumer0.receive(5000);
+ if (msg!=null)
+ {
+ log.info("msg = " + msg.getText());
+ numberOfMessagesReceived++;
+ }
+ if (numberOfMessagesReceived==5)
+ {
+ break;
+ }
+ } while (msg!=null);
+
+ session0.commit();
+ consumer0.close();
+
+
+ // Objects Server1
+ conn1 = cf.createConnection();
+
+ assertEquals(1, ((JBossConnection)conn1).getServerID());
+
+ conn1.start();
+
+ Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer1 = session1.createProducer(this.queue[1]);
+
+ producer1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int i=10; i<20; i++)
+ {
+ producer1.send(session0.createTextMessage("message " + i));
+ }
+
+ ServerManagement.killAndWait(1);
+
+ consumer0 = session0.createConsumer(queue[0]);
+ do
+ {
+ msg = (TextMessage)consumer0.receive(5000);
+ if (msg!=null)
+ {
+ log.info("msg = " + msg.getText());
+ numberOfMessagesReceived++;
+ }
+ } while (msg!=null);
+
+ session0.commit();
+
+ assertEquals(20, numberOfMessagesReceived);
+ }
+ finally
+ {
+ if (conn0!=null)
+ {
+ conn0.close();
+ }
+
+ if (conn1!=null)
+ {
+ conn1.close();
+ }
+ }
+ }
+
+ public void testMergeQueue2() throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+
+ int numberOfMessagesReceived = 0;
+
+ try
+ {
+
+ // Objects Server0
+ conn0 = cf.createConnection();
+
+ assertEquals(0, ((JBossConnection)conn0).getServerID());
+
+ Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
+
+ conn0.start();
+
+ MessageProducer producer0 = session0.createProducer(queue[0]);
+
+ producer0.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ MessageConsumer consumer0 = session0.createConsumer(queue[0]);
+
+ for (int i=0; i<10; i++)
+ {
+ producer0.send(session0.createTextMessage("message " + i));
+ }
+
+ session0.commit();
+
+ TextMessage msg;
+
+ do
+ {
+ msg = (TextMessage)consumer0.receive(5000);
+ if (msg!=null)
+ {
+ log.info("msg = " + msg.getText());
+ numberOfMessagesReceived++;
+ }
+ if (numberOfMessagesReceived==5)
+ {
+ break;
+ }
+ } while (msg!=null);
+
+ session0.commit();
+ consumer0.close();
+
+
+ // Objects Server1
+ conn1 = cf.createConnection();
+
+ assertEquals(1, ((JBossConnection)conn1).getServerID());
+
+ conn1.start();
+
+ Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer producer1 = session1.createProducer(this.queue[1]);
+
+ producer1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int i=10; i<20; i++)
+ {
+ producer1.send(session0.createTextMessage("message " + i));
+ }
+
+ session1.commit();
+
+ MessageConsumer consumer1 = session1.createConsumer(queue[0]);
+
+ ServerManagement.killAndWait(1);
+
+ TextMessage msgAfterKill = (TextMessage)consumer1.receive(1000);
+ assertNotNull(msgAfterKill);
+ session1.commit();
+
+ consumer1.close();
+
+ consumer0 = session0.createConsumer(queue[0]);
+ do
+ {
+ msg = (TextMessage)consumer0.receive(5000);
+ if (msg!=null)
+ {
+ log.info("msg = " + msg.getText());
+ numberOfMessagesReceived++;
+ }
+ } while (msg!=null);
+
+ session0.commit();
+
+ assertEquals(19, numberOfMessagesReceived);
+ }
+ finally
+ {
+ if (conn0!=null)
+ {
+ conn0.close();
+ }
+
+ if (conn1!=null)
+ {
+ conn1.close();
+ }
+ }
+ }
+
+ public void testMergeQueueSimple() throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+
+ try
+ {
+ // Objects Server0
+ conn0 = cf.createConnection();
+
+ assertEquals(0, ((JBossConnection)conn0).getServerID());
+
+ conn1 = cf.createConnection();
+
+ assertEquals(1, ((JBossConnection)conn1).getServerID());
+
+ //Send some messages on node 0
+
+ Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer producer0 = session0.createProducer(queue[0]);
+
+ for (int i = 0; i < 10; i++)
+ {
+ producer0.send(session0.createTextMessage("message " + i));
+ }
+
+ session0.commit();
+
+
+ //Send some more on node 1
+
+ Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer producer1 = session1.createProducer(queue[1]);
+
+ for (int i = 10; i < 20; i++)
+ {
+ producer1.send(session1.createTextMessage("message " + i));
+ }
+
+ session1.commit();
+
+
+ //Make sure messages exist
+
+ MessageConsumer cons0 = session0.createConsumer(queue[0]);
+
+ conn0.start();
+
+ TextMessage tm;
+
+ for (int i = 0; i < 10; i++)
+ {
+ tm = (TextMessage)cons0.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message " + i, tm.getText());
+ }
+
+ tm = (TextMessage)cons0.receive(1000);
+
+ assertNull(tm);
+
+ session0.rollback();
+
+ cons0.close();
+
+ cons0 = null;
+
+
+ MessageConsumer cons1 = session1.createConsumer(queue[0]);
+
+ conn1.start();
+
+ for (int i = 10; i < 20; i++)
+ {
+ tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message " + i, tm.getText());
+ }
+
+ tm = (TextMessage)cons1.receive(1000);
+
+ assertNull(tm);
+
+ session1.rollback();
+
+ cons1.close();
+
+ cons1 = null;
+
+
+ //Now kill the server
+
+ ServerManagement.killAndWait(1);
+
+ //Messages should all be available on node 0
+
+ cons0 = session0.createConsumer(queue[0]);
+
+ for (int i = 0; i < 20; i++)
+ {
+ tm = (TextMessage)cons0.receive(1000);
+
+ assertNotNull(tm);
+
+ log.info("received message " + tm.getText());
+
+
+ //assertEquals("message " + i, tm.getText());
+ }
+
+ tm = (TextMessage)cons0.receive(1000);
+
+ assertNull(tm);
+
+ }
+ finally
+ {
+ if (conn0!=null)
+ {
+ conn0.close();
+ }
+
+ if (conn1!=null)
+ {
+ conn1.close();
+ }
+ }
+ }
+
+ // Fil consumer
+
+ /*
+ * Both queues paging > fullsize
+ */
+ public void testMergeQueuePagingFill1() throws Exception
+ {
+ mergeQueuePaging(20, 20, 10, 10, true);
+ }
+
+ /*
+ * Both queues paging = fullsize
+ */
+ public void testMergeQueuePagingFill2() throws Exception
+ {
+ mergeQueuePaging(10, 10, 10, 10, true);
+ }
+
+ /*
+ * First queue paging, second queue not > full size
+ */
+ public void testMergeQueuePagingFill3() throws Exception
+ {
+ mergeQueuePaging(20, 5, 10, 10, true);
+ }
+
+ /*
+ * Second queue paging, first queue not > full size
+ */
+ public void testMergeQueuePagingFill4() throws Exception
+ {
+ mergeQueuePaging(5, 20, 10, 10, true);
+ }
+
+ /*
+ * First queue paging, second queue not = full size
+ */
+ public void testMergeQueuePagingFill5() throws Exception
+ {
+ mergeQueuePaging(10, 5, 10, 10, true);
+ }
+
+ /*
+ * Second queue paging, first queue not = full size
+ */
+ public void testMergeQueuePagingFill6() throws Exception
+ {
+ mergeQueuePaging(5, 10, 10, 10, true);
+ }
+
+ // Don't fill consumer
+
+ /*
+ * Both queues paging > fullsize
+ */
+ public void testMergeQueuePagingNoFill1() throws Exception
+ {
+ mergeQueuePaging(20, 20, 10, 10, false);
+ }
+
+ /*
+ * Both queues paging = fullsize
+ */
+ public void testMergeQueuePagingNoFill2() throws Exception
+ {
+ mergeQueuePaging(10, 10, 10, 10, false);
+ }
+
+ /*
+ * First queue paging, second queue not > full size
+ */
+ public void testMergeQueuePagingNoFill3() throws Exception
+ {
+ mergeQueuePaging(20, 5, 10, 10, false);
+ }
+
+ /*
+ * Second queue paging, first queue not > full size
+ */
+ public void testMergeQueuePagingNoFill4() throws Exception
+ {
+ mergeQueuePaging(5, 20, 10, 10, false);
+ }
+
+ /*
+ * First queue paging, second queue not = full size
+ */
+ public void testMergeQueuePagingNoFill5() throws Exception
+ {
+ mergeQueuePaging(10, 5, 10, 10, false);
+ }
+
+ /*
+ * Second queue paging, first queue not = full size
+ */
+ public void testMergeQueuePagingNoFill6() throws Exception
+ {
+ mergeQueuePaging(5, 10, 10, 10, false);
+ }
+
+ /*
+ * Both queues paging on merge
+ */
+ private void mergeQueuePaging(int messages0, int messages1, int full0, int full1, boolean fillConsumer) throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+
+ try
+ {
+ //Deploy queue with fullSize of 10
+
+ ServerManagement.deployQueue("constrainedQueue", "queue/constrainedQueue",full0, 2, 2, 0, true);
+
+ ServerManagement.deployQueue("constrainedQueue", "queue/constrainedQueue",full1, 2, 2, 1, true);
+
+ Queue queue0 = (Queue)ic[0].lookup("queue/constrainedQueue");
+
+ Queue queue1 = (Queue)ic[1].lookup("queue/constrainedQueue");
+
+ // Objects Server0
+ conn0 = cf.createConnection();
+
+ assertEquals(0, ((JBossConnection)conn0).getServerID());
+
+ conn1 = cf.createConnection();
+
+ assertEquals(1, ((JBossConnection)conn1).getServerID());
+
+ //Send some messages on node 0
+
+ Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer producer0 = session0.createProducer(queue0);
+
+ for (int i = 0; i < messages0; i++)
+ {
+ producer0.send(session0.createTextMessage("message " + i));
+ }
+
+ session0.commit();
+
+
+ //Send some more on node 1
+
+ Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer producer1 = session1.createProducer(queue1);
+
+ for (int i = messages0; i < messages0 + messages1; i++)
+ {
+ producer1.send(session1.createTextMessage("message " + i));
+ }
+
+ session1.commit();
+
+
+ //Make sure messages exist
+
+ MessageConsumer cons0 = session0.createConsumer(queue0);
+
+ conn0.start();
+
+ TextMessage tm;
+
+ for (int i = 0; i < messages0; i++)
+ {
+ tm = (TextMessage)cons0.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message " + i, tm.getText());
+ }
+
+ tm = (TextMessage)cons0.receive(2000);
+
+ assertNull(tm);
+
+ session0.rollback();
+
+ cons0.close();
+
+ cons0 = null;
+
+
+ MessageConsumer cons1 = session1.createConsumer(queue1);
+
+ conn1.start();
+
+ for (int i = messages0; i < messages0 + messages1; i++)
+ {
+ tm = (TextMessage)cons1.receive(2000);
+
+ assertNotNull(tm);
+
+ assertEquals("message " + i, tm.getText());
+ }
+
+ tm = (TextMessage)cons1.receive(2000);
+
+ assertNull(tm);
+
+ session1.rollback();
+
+ cons1.close();
+
+ cons1 = null;
+
+ //Now kill the server
+
+ ServerManagement.killAndWait(1);
+
+ //Messages should all be available on node 0
+
+ conn0.start();
+
+ if (!fillConsumer)
+ {
+ Thread.sleep(5000);
+ }
+
+ //Creating the consumer immediately after kill should ensure that all the messages are in the consumer and
+ //not paged to disk
+ cons0 = session0.createConsumer(queue0);
+
+ log.info("now consuming");
+ for (int i = 0; i < messages0 + messages1; i++)
+ {
+ tm = (TextMessage)cons0.receive(5000);
+
+ assertNotNull(tm);
+
+ log.info("received message " + tm.getText());
+ }
+
+ tm = (TextMessage)cons0.receive(2000);
+
+ assertNull(tm);
+
+ }
+ finally
+ {
+ try
+ {
+ ServerManagement.undeployQueue("constrainedQueue", 0);
+ }
+ catch (Exception ignore)
+ {
+ }
+
+ try
+ {
+ ServerManagement.undeployQueue("constrainedQueue", 1);
+ }
+ catch (Exception ignore)
+ {
+ }
+
+
+ if (conn0!=null)
+ {
+ conn0.close();
+ }
+
+ if (conn1!=null)
+ {
+ conn1.close();
+ }
+ }
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ nodeCount = 2;
+
+ super.setUp();
+
+ log.debug("setup done");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Copied: trunk/tests/src/org/jboss/test/messaging/jms/manual/FailoverTest.java (from rev 2440, trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java)
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/manual/FailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/manual/FailoverTest.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -0,0 +1,172 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.manual;
+
+import java.util.Hashtable;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+
+import junit.framework.TestCase;
+
+import org.jboss.logging.Logger;
+
+public class FailoverTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(FailoverTest.class);
+
+
+ public FailoverTest(String name)
+ {
+ super(name);
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testNOOP()
+ {
+
+ }
+
+ public void testSendReceive() throws Exception
+ {
+ Hashtable properties = new Hashtable();
+
+ properties.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
+
+ properties.put("java.naming.provider.url", "jnp://192.168.1.11:1199");
+
+ properties.put("java.naming.factory.url", "org.jnp.interfaces");
+
+ log.info("Creaing ic");
+
+ InitialContext ic = new InitialContext(properties);
+
+ log.info("************ REMOTE");
+
+ Connection conn = null;
+
+ try
+ {
+ log.info("Created ic");
+
+ Queue queue = (Queue)ic.lookup("/queue/testDistributedQueue");
+
+ log.info("Looked up queue");
+
+ ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ log.info("Looked up cf");
+
+ conn = cf.createConnection();
+
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sessCons = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sessCons.createConsumer(queue);
+
+ MessageListener list = new MyListener();
+
+ cons.setMessageListener(list);
+
+ conn.start();
+
+ MessageProducer prod = sessSend.createProducer(queue);
+
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ int count = 0;
+
+ while (true)
+ {
+ TextMessage tm = sessSend.createTextMessage("message " + count);
+
+ prod.send(tm);
+
+ log.info("sent " + count);
+
+ count++;
+
+ //Thread.sleep(250);
+ }
+
+
+ }
+ catch (Exception e)
+ {
+ log.error("Failed", e);
+ throw e;
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ log.info("closing connetion");
+ try
+ {
+ conn.close();
+ }
+ catch (Exception ignore)
+ {
+ }
+ log.info("closed connection");
+ }
+ }
+ }
+
+ class MyListener implements MessageListener
+ {
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ TextMessage tm = (TextMessage)msg;
+
+ log.info("Received message " + tm.getText());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to receive", e);
+ }
+ }
+
+ }
+}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-02-26 19:25:34 UTC (rev 2449)
@@ -939,6 +939,16 @@
insureStarted();
servers[0].getServer().deployQueue(name, jndiName, fullSize, pageSize, downCacheSize, false);
}
+
+ /**
+ * Simulates a queue deployment (copying the queue descriptor in the deploy directory).
+ */
+ public static void deployQueue(String name, String jndiName, int fullSize, int pageSize,
+ int downCacheSize, int serverIndex, boolean clustered) throws Exception
+ {
+ insureStarted();
+ servers[serverIndex].getServer().deployQueue(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
+ }
/**
* Simulates a queue un-deployment (deleting the queue descriptor from the deploy directory).
More information about the jboss-cvs-commits
mailing list