[jboss-cvs] JBoss Messaging SVN: r2449 - in trunk: src/etc/server/default/deploy and 13 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Feb 26 14:25:34 EST 2007


Author: timfox
Date: 2007-02-26 14:25:34 -0500 (Mon, 26 Feb 2007)
New Revision: 2449

Added:
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/manual/FailoverTest.java
Removed:
   trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java
Modified:
   trunk/src/etc/aop-messaging-client.xml
   trunk/src/etc/server/default/deploy/clustered-db2-persistence-service.xml
   trunk/src/etc/server/default/deploy/clustered-mssql-persistence-service.xml
   trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
   trunk/src/etc/server/default/deploy/clustered-oracle-persistence-service.xml
   trunk/src/etc/server/default/deploy/clustered-postgresql-persistence-service.xml
   trunk/src/etc/server/default/deploy/clustered-sybase-persistence-service.xml
   trunk/src/etc/server/default/deploy/db2-persistence-service.xml
   trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
   trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
   trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
   trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
   trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
   trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
   trunk/src/main/org/jboss/jms/client/FailoverValve2.java
   trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
   trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
   trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
Fixed consumer.close() and queue merging bugs


Modified: trunk/src/etc/aop-messaging-client.xml
===================================================================
--- trunk/src/etc/aop-messaging-client.xml	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/aop-messaging-client.xml	2007-02-26 19:25:34 UTC (rev 2449)
@@ -49,13 +49,16 @@
       <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
       <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
       <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>               
-   </bind>  
-    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->start())">
+   </bind>
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->$implementing{org.jboss.jms.server.endpoint.ConnectionEndpoint}(..))">
+      <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>         
+   </bind>      
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->start())">
        <advice name="handleStart" aspect="org.jboss.jms.client.container.ConnectionAspect"/>
-    </bind>
-    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->stop())">
-       <advice name="handleStop" aspect="org.jboss.jms.client.container.ConnectionAspect"/>
-    </bind>
+   </bind>
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->stop())">
+      <advice name="handleStop" aspect="org.jboss.jms.client.container.ConnectionAspect"/>
+   </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->createConnectionConsumer(..))">
       <advice name="handleCreateConnectionConsumer" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
@@ -89,9 +92,6 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->createSessionDelegate(..))">
       <advice name="handleCreateSessionDelegate" aspect="org.jboss.jms.client.container.StateCreationAspect"/>
    </bind>
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConnectionDelegate->$implementing{org.jboss.jms.server.endpoint.ConnectionEndpoint}(..))">
-      <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>         
-   </bind> 
 
    <!--
         Session Stack
@@ -100,8 +100,11 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->$implementing{org.jboss.jms.delegate.SessionDelegate}(..))">
       <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
       <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
-      <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>       
+      <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>             
    </bind>
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->$implementing{org.jboss.jms.server.endpoint.SessionEndpoint}(..))">
+      <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>         
+   </bind> 
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createMessage())">
       <advice name="handleCreateMessage" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
@@ -183,9 +186,6 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createBrowserDelegate(..))">
       <advice name="handleCreateBrowserDelegate" aspect="org.jboss.jms.client.container.StateCreationAspect"/>
    </bind>
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->$implementing{org.jboss.jms.server.endpoint.SessionEndpoint}(..))">
-      <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>         
-   </bind> 
    
    <!--
         Consumer Stack
@@ -195,7 +195,10 @@
       <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
       <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
       <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>          
-   </bind>       
+   </bind>  
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->$implementing{org.jboss.jms.server.endpoint.ConsumerEndpoint}(..))">
+      <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>         
+   </bind>         
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getMessageListener())">
       <advice name="handleGetMessageListener" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         
    </bind>
@@ -219,10 +222,7 @@
    </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getMessageSelector())">
       <advice name="handleGetMessageSelector" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         
-   </bind>   
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->$implementing{org.jboss.jms.server.endpoint.ConsumerEndpoint}(..))">
-      <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>         
-   </bind>    
+   </bind>      
 
    
    <!--
@@ -287,14 +287,14 @@
       <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
       <interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
       <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
+   </bind>   
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->$implementing{org.jboss.jms.server.endpoint.BrowserEndpoint}(..))">
+      <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>         
    </bind>      
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->nextMessage())">
       <advice name="handleNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
    </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->hasNextMessage())">
       <advice name="handleHasNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
-   </bind>        
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->$implementing{org.jboss.jms.server.endpoint.BrowserEndpoint}(..))">
-      <interceptor-ref name="org.jboss.jms.client.container.FailoverValveInterceptor"/>         
-   </bind> 
+   </bind>         
 </aop>
\ No newline at end of file

Modified: trunk/src/etc/server/default/deploy/clustered-db2-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-db2-persistence-service.xml	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/clustered-db2-persistence-service.xml	2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
    ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
    LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
    LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
    UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?

Modified: trunk/src/etc/server/default/deploy/clustered-mssql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mssql-persistence-service.xml	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/clustered-mssql-persistence-service.xml	2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
    ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
    LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
    LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
    UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?

Modified: trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
    ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
    LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
    LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD     
    UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?

Modified: trunk/src/etc/server/default/deploy/clustered-oracle-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-oracle-persistence-service.xml	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/clustered-oracle-persistence-service.xml	2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
    ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
    LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
    LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
    UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?

Modified: trunk/src/etc/server/default/deploy/clustered-postgresql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-postgresql-persistence-service.xml	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/clustered-postgresql-persistence-service.xml	2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
    ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
    LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
    LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
    UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?

Modified: trunk/src/etc/server/default/deploy/clustered-sybase-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-sybase-persistence-service.xml	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/clustered-sybase-persistence-service.xml	2007-02-26 19:25:34 UTC (rev 2449)
@@ -39,6 +39,7 @@
    ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
    LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
    LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
    UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?

Modified: trunk/src/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/db2-persistence-service.xml	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/db2-persistence-service.xml	2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
    ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
    LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
    LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
    UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?

Modified: trunk/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mssql-persistence-service.xml	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/mssql-persistence-service.xml	2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
    ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
    LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
    LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
    UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?

Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
    ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
    LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
    LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
    UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?

Modified: trunk/src/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/oracle-persistence-service.xml	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/oracle-persistence-service.xml	2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
    ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
    LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
    LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
    UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?

Modified: trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/postgresql-persistence-service.xml	2007-02-26 19:25:34 UTC (rev 2449)
@@ -38,6 +38,7 @@
    ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
    LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
    LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
    UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?

Modified: trunk/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/sybase-persistence-service.xml	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/etc/server/default/deploy/sybase-persistence-service.xml	2007-02-26 19:25:34 UTC (rev 2449)
@@ -39,6 +39,7 @@
    ROLLBACK_MESSAGE_REF2=UPDATE JBM_MSG_REF SET STATE='C', TRANSACTION_ID = NULL WHERE TRANSACTION_ID=? AND STATE='-'
    LOAD_PAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, PAGE_ORD, SCHED_DELIVERY FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
    LOAD_UNPAGED_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD
+   LOAD_REFS=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' AND CHANNEL_ID = ? ORDER BY ORD
    UPDATE_REFS_NOT_PAGED=UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    SELECT_EXISTS_REF=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?

Modified: trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/FailoverCommandCenter.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -85,8 +85,6 @@
 
          valve.close();
          
-         log.debug(this + " starting client-side failover");
-
          synchronized(this)
          {
             // testing for failed connection and setting the failed flag need to be done in one
@@ -102,6 +100,9 @@
             remotingConnection.setFailed();
          }
          
+         //Note - failover doesn't occur until _after_ the above check - so the next comment belongs here
+         log.debug(this + " starting client-side failover");
+         
          // generate a FAILOVER_STARTED event. The event must be broadcasted AFTER valve closure,
          // to insure the client-side stack is in a deterministic state
          broadcastFailoverEvent(new FailoverEvent(FailoverEvent.FAILOVER_STARTED, this));

Modified: trunk/src/main/org/jboss/jms/client/FailoverValve2.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverValve2.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/FailoverValve2.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -39,16 +39,30 @@
 
    private static final Logger log = Logger.getLogger(FailoverValve2.class);
 
+
    // Static ---------------------------------------------------------------------------------------
 
    private static boolean trace = log.isTraceEnabled();
 
    // Attributes -----------------------------------------------------------------------------------
 
+   // Only use this in trace mode
+   private Set threads;
+
    private int count;
+   
    private boolean locked;
-   private Set threads = new HashSet();
 
+   public FailoverValve2()
+   {
+      trace = log.isTraceEnabled();
+      
+      if (trace)
+      {
+         threads = new HashSet();
+      }
+   }
+
    // Constructors ---------------------------------------------------------------------------------
 
    // Public ---------------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -171,9 +171,15 @@
          return invocation.invokeNext();
       }
       finally
-      {
+      {                  
          if (isClosing)
          {
+            //We make sure we remove ourself AFTER the invocation has been made
+            //otherwise in a failover situation we would end up divorced from the hierarchy
+            //and failover will not occur properly since failover would not be able to
+            //traverse the hierarchy and update the delegates properly
+            removeSelf(invocation);
+            
             closing();
          }
          else if (isClose)
@@ -297,11 +303,20 @@
             }
          }
       }
-      
-      // Remove from the parent
+   }
+   
+   /**
+    * Remove from parent
+    * 
+    * @param invocation the invocation
+    */
+   protected void removeSelf(Invocation invocation)
+   {                  
+      HierarchicalState state = ((DelegateSupport)invocation.getTargetObject()).getState();
+            
       HierarchicalState parent = state.getParent();
       if (parent != null)
-      {         
+      {                  
          parent.getChildren().remove(state);
       }
    }

Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -34,6 +34,7 @@
 import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.util.MessageQueueNameHelper;
+import org.jboss.logging.Logger;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -53,6 +54,8 @@
 {
    // Constants -----------------------------------------------------
    
+   private static final Logger log = Logger.getLogger(ConsumerAspect.class);
+   
    // Static --------------------------------------------------------
 
    // Attributes ----------------------------------------------------
@@ -114,11 +117,21 @@
    {      
       ConsumerState consumerState = getState(invocation);
         
-      
       // First we call close on the messagecallbackhandler which waits for onMessage invocations      
       // to complete any further messages received will be ignored
       consumerState.getMessageCallbackHandler().close();
+                        
+      // Then we make sure closing is called on the ServerConsumerEndpoint.
+
+      Object res = invocation.invokeNext();
       
+      //Now we send a message to the server consumer with the last delivery id so
+      //it can cancel any inflight messages after that
+      //This needs to be done *after* the call to closing has been executed on the server
+      //maybe it can be combined with closing
+      
+      ConsumerDelegate del = (ConsumerDelegate)invocation.getTargetObject();
+      
       long lastDeliveryId = consumerState.getMessageCallbackHandler().getLastDeliveryId();
       
       SessionState sessionState = (SessionState)consumerState.getParent();
@@ -128,17 +141,6 @@
 
       CallbackManager cm = connectionState.getRemotingConnection().getCallbackManager();
       cm.unregisterHandler(consumerState.getConsumerID());
-            
-      // Then we make sure closing is called on the ServerConsumerEndpoint.
-
-      Object res = invocation.invokeNext();
-      
-      //Now we send a message to the server consumer with the last delivery id so
-      //it can cancel any inflight messages after that
-      //This needs to be done *after* the call to closing has been executed on the server
-      //maybe it can be combined with closing
-      
-      ConsumerDelegate del = (ConsumerDelegate)invocation.getTargetObject();
          
       //Now we need to cancel any inflight messages - this must be done before
       //cancelling the message callback handler buffer, so that messages end up back in the channel

Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -150,7 +150,7 @@
    // Protected ------------------------------------------------------------------------------------     
    
    protected Object doInvoke(Client client, RequestSupport req) throws JMSException
-   {
+   {      
       return doInvoke(client, req, false);
    }
    
@@ -177,6 +177,7 @@
          }
          else
          {
+
             if (trace) { log.trace(this + " invoking " + req + " synchronously on server using " + client); }
 
             resp = client.invoke(req);

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -241,7 +241,7 @@
       //TODO - we temporarily need to execute on a different thread to avoid a deadlock situation in
       //       failover where a message is sent then the valve is locked, and the message send cause
       //       a message delivery back to the same client which tries to ack but can't get through
-      //       the valve.
+      //       the valve. This won't be necessary when we move to a non blocking transport
       this.sessionExecutor.execute(
          new Runnable()
          {
@@ -259,7 +259,7 @@
          });
    }
    
-   public void handleMessageInternal(Object message) throws Exception
+   private void handleMessageInternal(Object message) throws Exception
    {
       MessageProxy proxy = (MessageProxy) message;
 

Modified: trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConnectionState.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/jms/client/state/ConnectionState.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -153,12 +153,13 @@
       idGenerator = newState.idGenerator;
       serverID = newState.serverID;
       versionToUse = newState.versionToUse;
-
+      
       ConnectionDelegate newDelegate = (ConnectionDelegate)newState.getDelegate();
-
+      
       for(Iterator i = getChildren().iterator(); i.hasNext(); )
-      {
+      {                 
          SessionState sessionState = (SessionState)i.next();
+         
          ClientSessionDelegate sessionDelegate = (ClientSessionDelegate)sessionState.getDelegate();
 
          // create a new session on the new connection for each session on the old connection

Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -526,8 +526,10 @@
          firstPagingOrder = ili.getMinPageOrdering().longValue();
          
          nextPagingOrder = ili.getMaxPageOrdering().longValue() + 1;
+                           
+         paging = true;
          
-         paging = true;
+         log.info("set paging to true");
       }
       else
       {

Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -1017,9 +1017,9 @@
       }
    }
    
-   public InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int fullSize) throws Exception
+   public InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int numberToLoad, long firstPagingOrder, long nextPagingOrder) throws Exception
    {
-      if (trace) { log.trace("Merging channel from " + fromChannelID + " to " + toChannelID); }
+      if (trace) { log.trace("Merging channel from " + fromChannelID + " to " + toChannelID + " numberToLoad:" + numberToLoad + " firstPagingOrder:" + firstPagingOrder + " nextPagingOrder:" + nextPagingOrder); }
       
       Connection conn = null;
       PreparedStatement ps = null;
@@ -1031,96 +1031,139 @@
       {
          conn = ds.getConnection();
          
-         // First swap the channel id
+         /*
+          * If channel is paging and has full size f
+          * 
+          * then we don't need to load any refs but we need to:
+          * 
+          * make sure the page ord is correct across the old paged and new refs
+          * 
+          * we know the max page ord (from the channel) for the old refs so we just need to:
+          * 
+          * 1) Iterate through the failed channel and update page_ord = max + 1, max + 2 etc
+          * 
+          * 2) update channel id
+          * 
+          * 
+          * If channel is not paging and the total refs before and after <=f
+          * 
+          * 1) Load all refs from failed channel
+          * 
+          * 2) Update channel id
+          * 
+          * return those refs
+          * 
+          * 
+          * If channel is not paging but total new refs > f
+          * 
+          * 1) Iterate through failed channel refs and take the first x to make the channel full
+          * 
+          * 2) Update the others with page_ord starting at zero 
+          * 
+          * 3) Update channel id
+          * 
+          * In general:
+          * 
+          * We have number to load n, max page size p
+          * 
+          * 1) Iterate through failed channel refs in page_ord order
+          * 
+          * 2) Put the first n in a List.
+          * 
+          * 3) Initialise page_ord_count to be p or 0 depending on whether it was specified
+          * 
+          * 4) Update the page_ord of the remaining refs accordiningly
+          * 
+          * 5) Update the channel id
+          * 
+          */
          
-         ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+         //First load the refs from the failed channel
+
+         List refs = new ArrayList();
          
-         ps.setLong(1, toChannelID);
+         ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
          
-         ps.setLong(2, fromChannelID);
-         
-         int rows = updateWithRetry(ps);
-         
-         if (trace) { log.trace("Update channel id updated " + rows + " rows"); }
-         
-         //Now set any pages refs to not paged
-         
-         ps = conn.prepareStatement(getSQLStatement("UPDATE_REFS_NOT_PAGED"));
+         ps.setLong(1, fromChannelID);
                  
-         ps.setLong(1, 0);
-         
-         ps.setLong(2, Integer.MAX_VALUE);
-         
-         ps.setLong(3, toChannelID);
-         
-         rows = updateWithRetry(ps);
-         
-         if (trace) { log.trace(" Set paged refs updated " + rows + " rows"); }
-         
-         //Now load the refs
-         
-         ps = conn.prepareStatement(getSQLStatement("LOAD_UNPAGED_REFS"));
-         
-         ps.setLong(1, toChannelID);
-                 
          rs = ps.executeQuery();
          
-         List refs = new ArrayList();
-                          
          int count = 0;
-         int pageOrd = 0;
          
          boolean arePaged = false;
          
+         long pageOrd = nextPagingOrder;
+         
          while (rs.next())
          {
             long msgId = rs.getLong(1);            
             int deliveryCount = rs.getInt(2);
             long sched = rs.getLong(3);
             
-            ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
+            if (count < numberToLoad)
+            {           
+               ReferenceInfo ri = new ReferenceInfo(msgId, deliveryCount, sched);
+               
+               refs.add(ri);
+            }
             
-            if (count < fullSize)
+            // Set page ord
+            
+            if (ps2 == null)
             {
-               refs.add(ri);
-            }   
-            else
+               ps2 = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+            }
+                
+            if (count < numberToLoad)
             {
-               //These ones need to be made paged
-                              
-               if (ps2 == null)
-               {
-                  ps2 = conn.prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
-                   
-                  ps2.setLong(1, pageOrd);
-                  
-                  ps2.setLong(2, msgId);
-                  
-                  ps2.setLong(3, toChannelID);
-                  
-                  rows = updateWithRetry(ps2);
-                  
-                  if (trace) { log.trace("Update page ord updated " + rows + " rows"); }
-                  
-                  pageOrd++;
-                  
-                  arePaged = true;
-                           
-               }
+               ps2.setNull(1, Types.BIGINT);
+               
+               if (trace) { log.trace("Set page ord to null"); }
             }
+            else
+            {                                 
+               ps2.setLong(1, pageOrd);
+               
+               if (trace) { log.trace("Set page ord to " + pageOrd); }
+               
+               arePaged = true; 
+               
+               pageOrd++;                      
+            }
             
-            count++;
+            ps2.setLong(2, msgId);
+            
+            ps2.setLong(3, fromChannelID);
+            
+            int rows = updateWithRetry(ps2);
+            
+            if (trace) { log.trace("Update page ord updated " + rows + " rows"); }
+
+            count++;            
          }
          
+         ps.close();
+         
+         // Now swap the channel id
+         
+         ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+         
+         ps.setLong(1, toChannelID);
+         
+         ps.setLong(2, fromChannelID);
+         
+         int rows = updateWithRetry(ps);
+         
+         if (trace) { log.trace("Update channel id updated " + rows + " rows"); }
+                           
          if (arePaged)
-         {
-            return new InitialLoadInfo(new Long(0), new Long(pageOrd - 1), refs);
+         {            
+            return new InitialLoadInfo(new Long(firstPagingOrder), new Long(pageOrd - 1), refs);
          }
          else
          {
             return new InitialLoadInfo(null, null, refs);
-         }
-         
+         }         
       }
       catch (Exception e)
       {
@@ -3406,6 +3449,9 @@
       map.put("LOAD_UNPAGED_REFS",
               "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' " +
               "AND CHANNEL_ID = ? AND PAGE_ORD IS NULL ORDER BY ORD");
+      map.put("LOAD_REFS",
+              "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' " +
+              "AND CHANNEL_ID = ? ORDER BY ORD");      
       
       map.put("UPDATE_REFS_NOT_PAGED", "UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?");       
       map.put("SELECT_MIN_MAX_PAGE_ORD", "SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?");

Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -70,7 +70,7 @@
    
    InitialLoadInfo loadFromStart(long channelID, int fullSize) throws Exception;
    
-   InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int fullSize) throws Exception;
+   InitialLoadInfo mergeAndLoad(long fromChannelID, long toChannelID, int numberToLoad, long firstPagingOrder, long nextPagingOrder) throws Exception;  
      
    List getMessages(List messageIds) throws Exception;
          

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -233,15 +233,24 @@
    {
       if (trace) { log.trace("Merging queue " + remoteQueue + " into " + this); }
            
+      log.info("queue is paging:" + this.paging + " message refs size " +
+               this.messageRefs.size() + " fullsize:" + this.fullSize +
+               " delivering:" + this.deliveringCount.get());
+      
       synchronized (refLock)
       {
          flushDownCache();
                   
          PersistenceManager.InitialLoadInfo ili =
-            pm.mergeAndLoad(remoteQueue.getChannelID(), this.channelID, fullSize - messageRefs.size());
+            pm.mergeAndLoad(remoteQueue.getChannelID(), channelID, fullSize - messageRefs.size(), firstPagingOrder, nextPagingOrder);
             
-         doLoad(ili);
+         if (trace) { log.trace("Loaded " + ili.getRefInfos().size() + " refs"); }            
+                           
+         log.info("firstpageord:" + ili.getMinPageOrdering() + " lastpageord:" + ili.getMaxPageOrdering());
          
+         doLoad(ili);         
+         
+         deliverInternal();
       }
    }
    

Deleted: trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -1,172 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms;
-
-import java.util.Hashtable;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.InitialContext;
-
-import junit.framework.TestCase;
-
-import org.jboss.logging.Logger;
-
-public class FailoverTest extends TestCase
-{
-   private static final Logger log = Logger.getLogger(FailoverTest.class);
-   
-
-   public FailoverTest(String name)
-   {
-      super(name);
-   }
-
-   protected void setUp() throws Exception
-   {
-      super.setUp();      
-   }
-
-   protected void tearDown() throws Exception
-   {
-      super.tearDown();
-   }
-   
-   public void testNOOP()
-   {
-      
-   }
-   
-//   public void testSendReceive() throws Exception
-//   {
-//      Hashtable properties = new Hashtable();
-//
-//      properties.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
-//
-//      properties.put("java.naming.provider.url", "jnp://192.168.1.11:1199");
-//
-//      properties.put("java.naming.factory.url", "org.jnp.interfaces");
-//
-//      log.info("Creaing ic");
-//
-//      InitialContext ic = new InitialContext(properties);
-//
-//      log.info("************ REMOTE");
-//
-//      Connection conn = null;
-//
-//      try
-//      {
-//         log.info("Created ic");
-//
-//         Queue queue = (Queue)ic.lookup("/queue/testDistributedQueue");
-//
-//         log.info("Looked up queue");
-//
-//         ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
-//
-//         log.info("Looked up cf");
-//
-//         conn = cf.createConnection();
-//
-//         Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-//         Session sessCons = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-//         MessageConsumer cons = sessCons.createConsumer(queue);
-//
-//         MessageListener list = new MyListener();
-//
-//         cons.setMessageListener(list);
-//
-//         conn.start();
-//
-//         MessageProducer prod = sessSend.createProducer(queue);
-//
-//         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
-//
-//         int count = 0;
-//
-//         while (true)
-//         {
-//            TextMessage tm = sessSend.createTextMessage("message " + count);
-//
-//            prod.send(tm);
-//
-//            log.info("sent " + count);
-//
-//            count++;
-//
-//            //Thread.sleep(250);
-//         }
-//
-//
-//      }
-//      catch (Exception e)
-//      {
-//         log.error("Failed", e);
-//         throw e;
-//      }
-//      finally
-//      {
-////         if (conn != null)
-////         {
-////            log.info("closing connetion");
-////            try
-////            {
-////               conn.close();
-////            }
-////            catch (Exception ignore)
-////            {
-////            }
-////            log.info("closed connection");
-////         }
-//      }
-//   }
-   
-   class MyListener implements MessageListener
-   {
-
-      public void onMessage(Message msg)
-      {
-         try
-         {
-            TextMessage tm = (TextMessage)msg;
-            
-            log.info("Received message " + tm.getText());
-         }
-         catch (Exception e)
-         {
-            log.error("Failed to receive", e);
-         }
-      }
-      
-   }
-}

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -1895,218 +1895,8 @@
       }
    }
 
-   public void testMergeQueue() throws Exception
-   {
-      Connection conn0 = null;
-      Connection conn1 = null;
+   
 
-      int numberOfMessagesReceived = 0;
-
-      try
-      {
-
-         // Objects Server0
-         conn0 = cf.createConnection();
-
-         assertEquals(0, ((JBossConnection)conn0).getServerID());
-
-         Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
-
-         conn0.start();
-
-         MessageProducer producer0 = session0.createProducer(queue[0]);
-
-         producer0.setDeliveryMode(DeliveryMode.PERSISTENT);
-
-         MessageConsumer consumer0 = session0.createConsumer(queue[0]);
-
-         for (int i=0; i<10; i++)
-         {
-            producer0.send(session0.createTextMessage("message " + i));
-         }
-
-         session0.commit();
-
-         TextMessage msg;
-
-         do
-         {
-            msg = (TextMessage)consumer0.receive(5000);
-            if (msg!=null)
-            {
-               log.info("msg = " + msg.getText());
-               numberOfMessagesReceived++;
-            }
-            if (numberOfMessagesReceived==5)
-            {
-               break;
-            }
-         } while (msg!=null);
-
-         session0.commit();
-         consumer0.close();
-
-
-         // Objects Server1
-         conn1 = cf.createConnection();
-
-         assertEquals(1, ((JBossConnection)conn1).getServerID());
-
-         conn1.start();
-
-         Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         MessageProducer producer1 = session1.createProducer(this.queue[1]);
-
-         producer1.setDeliveryMode(DeliveryMode.PERSISTENT);
-
-         for (int i=10; i<20; i++)
-         {
-            producer1.send(session0.createTextMessage("message " + i));
-         }
-
-         ServerManagement.killAndWait(1);
-
-         consumer0 = session0.createConsumer(queue[0]);
-         do
-         {
-            msg = (TextMessage)consumer0.receive(5000);
-            if (msg!=null)
-            {
-               log.info("msg = " + msg.getText());
-               numberOfMessagesReceived++;
-            }
-         } while (msg!=null);
-
-         session0.commit();
-
-         assertEquals(20, numberOfMessagesReceived);
-      }
-      finally
-      {
-         if (conn0!=null)
-         {
-            conn0.close();
-         }
-
-         if (conn1!=null)
-         {
-            conn1.close();
-         }
-      }
-   }
-
-   public void testMergeQueue2() throws Exception
-   {
-      Connection conn0 = null;
-      Connection conn1 = null;
-
-      int numberOfMessagesReceived = 0;
-
-      try
-      {
-
-         // Objects Server0
-         conn0 = cf.createConnection();
-
-         assertEquals(0, ((JBossConnection)conn0).getServerID());
-
-         Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
-
-         conn0.start();
-
-         MessageProducer producer0 = session0.createProducer(queue[0]);
-
-         producer0.setDeliveryMode(DeliveryMode.PERSISTENT);
-
-         MessageConsumer consumer0 = session0.createConsumer(queue[0]);
-
-         for (int i=0; i<10; i++)
-         {
-            producer0.send(session0.createTextMessage("message " + i));
-         }
-
-         session0.commit();
-
-         TextMessage msg;
-
-         do
-         {
-            msg = (TextMessage)consumer0.receive(5000);
-            if (msg!=null)
-            {
-               log.info("msg = " + msg.getText());
-               numberOfMessagesReceived++;
-            }
-            if (numberOfMessagesReceived==5)
-            {
-               break;
-            }
-         } while (msg!=null);
-
-         session0.commit();
-         consumer0.close();
-
-
-         // Objects Server1
-         conn1 = cf.createConnection();
-
-         assertEquals(1, ((JBossConnection)conn1).getServerID());
-
-         conn1.start();
-
-         Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
-
-         MessageProducer producer1 = session1.createProducer(this.queue[1]);
-
-         producer1.setDeliveryMode(DeliveryMode.PERSISTENT);
-
-         for (int i=10; i<20; i++)
-         {
-            producer1.send(session0.createTextMessage("message " + i));
-         }
-
-         session1.commit();
-
-         MessageConsumer consumer1 = session1.createConsumer(queue[0]);
-
-         ServerManagement.killAndWait(1);
-
-         TextMessage msgAfterKill = (TextMessage)consumer1.receive(1000);
-         assertNotNull(msgAfterKill);
-         session1.commit();
-
-         consumer1.close();
-
-         consumer0 = session0.createConsumer(queue[0]);
-         do
-         {
-            msg = (TextMessage)consumer0.receive(5000);
-            if (msg!=null)
-            {
-               log.info("msg = " + msg.getText());
-               numberOfMessagesReceived++;
-            }
-         } while (msg!=null);
-
-         session0.commit();
-
-         assertEquals(19, numberOfMessagesReceived);
-      }
-      finally
-      {
-         if (conn0!=null)
-         {
-            conn0.close();
-         }
-
-         if (conn1!=null)
-         {
-            conn1.close();
-         }
-      }
-   }
-
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Added: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -0,0 +1,689 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+
+/**
+ * 
+ * A MergeQueueTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class MergeQueueTest extends ClusteringTestBase
+{
+   // Constants ------------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public MergeQueueTest(String name)
+   {
+      super(name);
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public void testMergeQueue() throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+
+      int numberOfMessagesReceived = 0;
+
+      try
+      {
+
+         // Objects Server0
+         conn0 = cf.createConnection();
+
+         assertEquals(0, ((JBossConnection)conn0).getServerID());
+
+         Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
+
+         conn0.start();
+
+         MessageProducer producer0 = session0.createProducer(queue[0]);
+
+         producer0.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         MessageConsumer consumer0 = session0.createConsumer(queue[0]);
+
+         for (int i=0; i<10; i++)
+         {
+            producer0.send(session0.createTextMessage("message " + i));
+         }
+
+         session0.commit();
+
+         TextMessage msg;
+
+         do
+         {
+            msg = (TextMessage)consumer0.receive(5000);
+            if (msg!=null)
+            {
+               log.info("msg = " + msg.getText());
+               numberOfMessagesReceived++;
+            }
+            if (numberOfMessagesReceived==5)
+            {
+               break;
+            }
+         } while (msg!=null);
+
+         session0.commit();
+         consumer0.close();
+
+
+         // Objects Server1
+         conn1 = cf.createConnection();
+
+         assertEquals(1, ((JBossConnection)conn1).getServerID());
+
+         conn1.start();
+
+         Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer1 = session1.createProducer(this.queue[1]);
+
+         producer1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         for (int i=10; i<20; i++)
+         {
+            producer1.send(session0.createTextMessage("message " + i));
+         }
+
+         ServerManagement.killAndWait(1);
+
+         consumer0 = session0.createConsumer(queue[0]);
+         do
+         {
+            msg = (TextMessage)consumer0.receive(5000);
+            if (msg!=null)
+            {
+               log.info("msg = " + msg.getText());
+               numberOfMessagesReceived++;
+            }
+         } while (msg!=null);
+
+         session0.commit();
+
+         assertEquals(20, numberOfMessagesReceived);
+      }
+      finally
+      {
+         if (conn0!=null)
+         {
+            conn0.close();
+         }
+
+         if (conn1!=null)
+         {
+            conn1.close();
+         }
+      }
+   }
+
+   public void testMergeQueue2() throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+
+      int numberOfMessagesReceived = 0;
+
+      try
+      {
+
+         // Objects Server0
+         conn0 = cf.createConnection();
+
+         assertEquals(0, ((JBossConnection)conn0).getServerID());
+
+         Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
+
+         conn0.start();
+
+         MessageProducer producer0 = session0.createProducer(queue[0]);
+
+         producer0.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         MessageConsumer consumer0 = session0.createConsumer(queue[0]);
+
+         for (int i=0; i<10; i++)
+         {
+            producer0.send(session0.createTextMessage("message " + i));
+         }
+
+         session0.commit();
+
+         TextMessage msg;
+
+         do
+         {
+            msg = (TextMessage)consumer0.receive(5000);
+            if (msg!=null)
+            {
+               log.info("msg = " + msg.getText());
+               numberOfMessagesReceived++;
+            }
+            if (numberOfMessagesReceived==5)
+            {
+               break;
+            }
+         } while (msg!=null);
+
+         session0.commit();
+         consumer0.close();
+
+
+         // Objects Server1
+         conn1 = cf.createConnection();
+
+         assertEquals(1, ((JBossConnection)conn1).getServerID());
+
+         conn1.start();
+
+         Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+
+         MessageProducer producer1 = session1.createProducer(this.queue[1]);
+
+         producer1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         for (int i=10; i<20; i++)
+         {
+            producer1.send(session0.createTextMessage("message " + i));
+         }
+
+         session1.commit();
+
+         MessageConsumer consumer1 = session1.createConsumer(queue[0]);
+
+         ServerManagement.killAndWait(1);
+
+         TextMessage msgAfterKill = (TextMessage)consumer1.receive(1000);
+         assertNotNull(msgAfterKill);
+         session1.commit();
+
+         consumer1.close();
+
+         consumer0 = session0.createConsumer(queue[0]);
+         do
+         {
+            msg = (TextMessage)consumer0.receive(5000);
+            if (msg!=null)
+            {
+               log.info("msg = " + msg.getText());
+               numberOfMessagesReceived++;
+            }
+         } while (msg!=null);
+
+         session0.commit();
+
+         assertEquals(19, numberOfMessagesReceived);
+      }
+      finally
+      {
+         if (conn0!=null)
+         {
+            conn0.close();
+         }
+
+         if (conn1!=null)
+         {
+            conn1.close();
+         }
+      }
+   }
+
+   public void testMergeQueueSimple() throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+
+      try
+      {
+         // Objects Server0
+         conn0 = cf.createConnection();
+
+         assertEquals(0, ((JBossConnection)conn0).getServerID());
+         
+         conn1 = cf.createConnection();
+         
+         assertEquals(1, ((JBossConnection)conn1).getServerID());
+         
+         //Send some messages on node 0
+         
+         Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
+
+         MessageProducer producer0 = session0.createProducer(queue[0]);
+
+         for (int i = 0; i < 10; i++)
+         {
+            producer0.send(session0.createTextMessage("message " + i));
+         }
+         
+         session0.commit();
+         
+         
+         //Send some more on node 1
+         
+         Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+
+         MessageProducer producer1 = session1.createProducer(queue[1]);
+
+         for (int i = 10; i < 20; i++)
+         {
+            producer1.send(session1.createTextMessage("message " + i));
+         }
+         
+         session1.commit();
+         
+         
+         //Make sure messages exist
+         
+         MessageConsumer cons0 = session0.createConsumer(queue[0]);
+         
+         conn0.start();
+         
+         TextMessage tm;
+         
+         for (int i = 0; i < 10; i++)
+         {
+            tm = (TextMessage)cons0.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message " + i, tm.getText());
+         }
+         
+         tm = (TextMessage)cons0.receive(1000);
+         
+         assertNull(tm);
+         
+         session0.rollback();
+         
+         cons0.close();
+         
+         cons0 = null;
+         
+         
+         MessageConsumer cons1 = session1.createConsumer(queue[0]);
+         
+         conn1.start();
+         
+         for (int i = 10; i < 20; i++)
+         {
+            tm = (TextMessage)cons1.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message " + i, tm.getText());
+         }
+         
+         tm = (TextMessage)cons1.receive(1000);
+         
+         assertNull(tm);
+         
+         session1.rollback();
+         
+         cons1.close();
+         
+         cons1 = null;
+         
+         
+         //Now kill the server
+    
+         ServerManagement.killAndWait(1);
+
+         //Messages should all be available on node 0
+         
+         cons0 = session0.createConsumer(queue[0]);
+         
+         for (int i = 0; i < 20; i++)
+         {
+            tm = (TextMessage)cons0.receive(1000);
+            
+            assertNotNull(tm);
+            
+            log.info("received message " + tm.getText());
+            
+            
+            //assertEquals("message " + i, tm.getText());
+         }
+         
+         tm = (TextMessage)cons0.receive(1000);
+         
+         assertNull(tm);
+         
+      }
+      finally
+      {
+         if (conn0!=null)
+         {
+            conn0.close();
+         }
+
+         if (conn1!=null)
+         {
+            conn1.close();
+         }
+      }
+   }
+   
+   // Fil consumer
+   
+   /*
+    * Both queues paging > fullsize
+    */
+   public void testMergeQueuePagingFill1() throws Exception
+   {      
+      mergeQueuePaging(20, 20, 10, 10, true);
+   }
+   
+   /*
+    * Both queues paging = fullsize
+    */
+   public void testMergeQueuePagingFill2() throws Exception
+   {
+      mergeQueuePaging(10, 10, 10, 10, true);
+   }
+   
+   /*
+    * First queue paging, second queue not > full size
+    */
+   public void testMergeQueuePagingFill3() throws Exception
+   {
+      mergeQueuePaging(20, 5, 10, 10, true);
+   }
+   
+   /*
+    * Second queue paging, first queue not > full size
+    */
+   public void testMergeQueuePagingFill4() throws Exception
+   {
+      mergeQueuePaging(5, 20, 10, 10, true);
+   }
+   
+   /*
+    * First queue paging, second queue not = full size
+    */
+   public void testMergeQueuePagingFill5() throws Exception
+   {
+      mergeQueuePaging(10, 5, 10, 10, true);
+   }
+   
+   /*
+    * Second queue paging, first queue not = full size
+    */
+   public void testMergeQueuePagingFill6() throws Exception
+   {
+      mergeQueuePaging(5, 10, 10, 10, true);
+   }
+   
+   // Don't fill consumer
+   
+   /*
+    * Both queues paging > fullsize
+    */
+   public void testMergeQueuePagingNoFill1() throws Exception
+   {      
+      mergeQueuePaging(20, 20, 10, 10, false);
+   }
+   
+   /*
+    * Both queues paging = fullsize
+    */
+   public void testMergeQueuePagingNoFill2() throws Exception
+   {
+      mergeQueuePaging(10, 10, 10, 10, false);
+   }
+   
+   /*
+    * First queue paging, second queue not > full size
+    */
+   public void testMergeQueuePagingNoFill3() throws Exception
+   {
+      mergeQueuePaging(20, 5, 10, 10, false);
+   }
+   
+   /*
+    * Second queue paging, first queue not > full size
+    */
+   public void testMergeQueuePagingNoFill4() throws Exception
+   {
+      mergeQueuePaging(5, 20, 10, 10, false);
+   }
+   
+   /*
+    * First queue paging, second queue not = full size
+    */
+   public void testMergeQueuePagingNoFill5() throws Exception
+   {
+      mergeQueuePaging(10, 5, 10, 10, false);
+   }
+   
+   /*
+    * Second queue paging, first queue not = full size
+    */
+   public void testMergeQueuePagingNoFill6() throws Exception
+   {
+      mergeQueuePaging(5, 10, 10, 10, false);
+   }
+   
+   /*
+    * Both queues paging on merge
+    */
+   private void mergeQueuePaging(int messages0, int messages1, int full0, int full1, boolean fillConsumer) throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+
+      try
+      {
+         //Deploy queue with fullSize of 10
+         
+         ServerManagement.deployQueue("constrainedQueue", "queue/constrainedQueue",full0, 2, 2, 0, true);
+         
+         ServerManagement.deployQueue("constrainedQueue", "queue/constrainedQueue",full1, 2, 2, 1, true);
+         
+         Queue queue0 = (Queue)ic[0].lookup("queue/constrainedQueue");
+         
+         Queue queue1 = (Queue)ic[1].lookup("queue/constrainedQueue");
+         
+         // Objects Server0
+         conn0 = cf.createConnection();
+
+         assertEquals(0, ((JBossConnection)conn0).getServerID());
+         
+         conn1 = cf.createConnection();
+         
+         assertEquals(1, ((JBossConnection)conn1).getServerID());
+         
+         //Send some messages on node 0
+         
+         Session session0 = conn0.createSession(true, Session.SESSION_TRANSACTED);
+
+         MessageProducer producer0 = session0.createProducer(queue0);
+
+         for (int i = 0; i < messages0; i++)
+         {
+            producer0.send(session0.createTextMessage("message " + i));
+         }
+         
+         session0.commit();
+         
+         
+         //Send some more on node 1
+         
+         Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+
+         MessageProducer producer1 = session1.createProducer(queue1);
+
+         for (int i = messages0; i < messages0 + messages1; i++)
+         {
+            producer1.send(session1.createTextMessage("message " + i));
+         }
+         
+         session1.commit();
+         
+         
+         //Make sure messages exist
+         
+         MessageConsumer cons0 = session0.createConsumer(queue0);
+         
+         conn0.start();
+         
+         TextMessage tm;
+         
+         for (int i = 0; i < messages0; i++)
+         {
+            tm = (TextMessage)cons0.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message " + i, tm.getText());
+         }
+         
+         tm = (TextMessage)cons0.receive(2000);
+         
+         assertNull(tm);
+         
+         session0.rollback();
+         
+         cons0.close();
+         
+         cons0 = null;
+         
+         
+         MessageConsumer cons1 = session1.createConsumer(queue1);
+         
+         conn1.start();
+         
+         for (int i = messages0; i < messages0 + messages1; i++)
+         {
+            tm = (TextMessage)cons1.receive(2000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message " + i, tm.getText());
+         }
+         
+         tm = (TextMessage)cons1.receive(2000);
+         
+         assertNull(tm);
+         
+         session1.rollback();
+         
+         cons1.close();
+         
+         cons1 = null;
+                 
+         //Now kill the server
+    
+         ServerManagement.killAndWait(1);
+
+         //Messages should all be available on node 0
+         
+         conn0.start();
+                  
+         if (!fillConsumer)
+         {
+            Thread.sleep(5000);
+         }
+         
+         //Creating the consumer immediately after kill should ensure that all the messages are in the consumer and
+         //not paged to disk
+         cons0 = session0.createConsumer(queue0);
+                           
+         log.info("now consuming");
+         for (int i = 0; i < messages0 + messages1; i++)
+         {
+            tm = (TextMessage)cons0.receive(5000);
+            
+            assertNotNull(tm);
+            
+            log.info("received message " + tm.getText());
+         }
+         
+         tm = (TextMessage)cons0.receive(2000);
+         
+         assertNull(tm);
+         
+      }
+      finally
+      {
+         try
+         {
+            ServerManagement.undeployQueue("constrainedQueue", 0);
+         }
+         catch (Exception ignore)
+         {            
+         }
+         
+         try
+         {
+            ServerManagement.undeployQueue("constrainedQueue", 1);
+         }
+         catch (Exception ignore)
+         {            
+         }
+         
+         
+         if (conn0!=null)
+         {
+            conn0.close();
+         }
+
+         if (conn1!=null)
+         {
+            conn1.close();
+         }
+      }
+   }
+   
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      nodeCount = 2;
+
+      super.setUp();
+
+      log.debug("setup done");
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+   
+}

Copied: trunk/tests/src/org/jboss/test/messaging/jms/manual/FailoverTest.java (from rev 2440, trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java)
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/manual/FailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/manual/FailoverTest.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -0,0 +1,172 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.manual;
+
+import java.util.Hashtable;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+
+import junit.framework.TestCase;
+
+import org.jboss.logging.Logger;
+
+public class FailoverTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(FailoverTest.class);
+   
+
+   public FailoverTest(String name)
+   {
+      super(name);
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();      
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+   
+   public void testNOOP()
+   {
+      
+   }
+   
+   public void testSendReceive() throws Exception
+   {
+      Hashtable properties = new Hashtable();
+
+      properties.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
+
+      properties.put("java.naming.provider.url", "jnp://192.168.1.11:1199");
+
+      properties.put("java.naming.factory.url", "org.jnp.interfaces");
+
+      log.info("Creaing ic");
+
+      InitialContext ic = new InitialContext(properties);
+
+      log.info("************ REMOTE");
+
+      Connection conn = null;
+
+      try
+      {
+         log.info("Created ic");
+
+         Queue queue = (Queue)ic.lookup("/queue/testDistributedQueue");
+
+         log.info("Looked up queue");
+
+         ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+         log.info("Looked up cf");
+
+         conn = cf.createConnection();
+
+         Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         Session sessCons = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons = sessCons.createConsumer(queue);
+
+         MessageListener list = new MyListener();
+
+         cons.setMessageListener(list);
+
+         conn.start();
+
+         MessageProducer prod = sessSend.createProducer(queue);
+
+         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         int count = 0;
+
+         while (true)
+         {
+            TextMessage tm = sessSend.createTextMessage("message " + count);
+
+            prod.send(tm);
+
+            log.info("sent " + count);
+
+            count++;
+
+            //Thread.sleep(250);
+         }
+
+
+      }
+      catch (Exception e)
+      {
+         log.error("Failed", e);
+         throw e;
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            log.info("closing connetion");
+            try
+            {
+               conn.close();
+            }
+            catch (Exception ignore)
+            {
+            }
+            log.info("closed connection");
+         }
+      }
+   }
+   
+   class MyListener implements MessageListener
+   {
+
+      public void onMessage(Message msg)
+      {
+         try
+         {
+            TextMessage tm = (TextMessage)msg;
+            
+            log.info("Received message " + tm.getText());
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to receive", e);
+         }
+      }
+      
+   }
+}

Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-02-26 13:18:42 UTC (rev 2448)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-02-26 19:25:34 UTC (rev 2449)
@@ -939,6 +939,16 @@
       insureStarted();
       servers[0].getServer().deployQueue(name, jndiName, fullSize, pageSize, downCacheSize, false);
    }
+   
+   /**
+    * Simulates a queue deployment (copying the queue descriptor in the deploy directory).
+    */
+   public static void deployQueue(String name, String jndiName, int fullSize, int pageSize,
+                                  int downCacheSize, int serverIndex, boolean clustered) throws Exception
+   {
+      insureStarted();
+      servers[serverIndex].getServer().deployQueue(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
+   }
 
    /**
     * Simulates a queue un-deployment (deleting the queue descriptor from the deploy directory).




More information about the jboss-cvs-commits mailing list