[jboss-cvs] JBoss Messaging SVN: r1363 - in trunk: docs/examples/secure-socket/etc src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/jms/server/plugin src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Sep 26 10:49:48 EDT 2006


Author: timfox
Date: 2006-09-26 10:49:27 -0400 (Tue, 26 Sep 2006)
New Revision: 1363

Added:
   trunk/src/etc/xmdesc/DefaultPostOffice-xmbean.xml
   trunk/src/main/org/jboss/messaging/core/plugin/DefaultPostOfficeService.java
Removed:
   trunk/src/etc/xmdesc/SimplePostOffice-xmbean.xml
   trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java
Modified:
   trunk/docs/examples/secure-socket/etc/messaging-secure-socket-service.xml
   trunk/src/etc/server/default/deploy/destinations-service.xml
   trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
   trunk/src/etc/server/default/deploy/messaging-service.xml
   trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
   trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
   trunk/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManager.java
   trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/plugin/JDBCShutdownLogger.java
   trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Log:
More clustering tweaks



Modified: trunk/docs/examples/secure-socket/etc/messaging-secure-socket-service.xml
===================================================================
--- trunk/docs/examples/secure-socket/etc/messaging-secure-socket-service.xml	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/docs/examples/secure-socket/etc/messaging-secure-socket-service.xml	2006-09-26 14:49:27 UTC (rev 1363)
@@ -21,7 +21,7 @@
             <invoker transport="sslsocket">
                <attribute name="marshaller" isParam="true">org.jboss.jms.server.remoting.JMSWireFormat</attribute>
                <attribute name="unmarshaller" isParam="true">org.jboss.jms.server.remoting.JMSWireFormat</attribute>
-               <attribute name="serializationtype" isParam="true">jboss</attribute>
+               <attribute name="serializationtype" isParam="true">jms</attribute>
                <attribute name="dataType" isParam="true">jms</attribute>
                <attribute name="socket.check_connection" isParam="true">false</attribute>
                <attribute name="timeout">0</attribute>

Modified: trunk/src/etc/server/default/deploy/destinations-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/destinations-service.xml	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/etc/server/default/deploy/destinations-service.xml	2006-09-26 14:49:27 UTC (rev 1363)
@@ -16,6 +16,7 @@
       name="jboss.messaging.destination:service=Queue,name=DLQ"
       xmbean-dd="xmdesc/Queue-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=QueuePostOffice</depends>
    </mbean>
 
    <!--
@@ -26,6 +27,7 @@
       name="jboss.messaging.destination:service=Topic,name=testTopic"
       xmbean-dd="xmdesc/Topic-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=TopicPostOffice</depends>
       <attribute name="SecurityConfig">
          <security>
             <role name="guest" read="true" write="true"/>
@@ -39,6 +41,7 @@
       name="jboss.messaging.destination:service=Topic,name=securedTopic"
       xmbean-dd="xmdesc/Topic-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=TopicPostOffice</depends>
       <attribute name="SecurityConfig">
          <security>
             <role name="publisher" read="true" write="true" create="false"/>
@@ -50,6 +53,7 @@
       name="jboss.messaging.destination:service=Topic,name=testDurableTopic"
       xmbean-dd="xmdesc/Topic-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=TopicPostOffice</depends>
       <attribute name="SecurityConfig">
          <security>
             <role name="guest" read="true" write="true"/>
@@ -63,6 +67,7 @@
       name="jboss.messaging.destination:service=Queue,name=testQueue"
       xmbean-dd="xmdesc/Queue-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=QueuePostOffice</depends>
       <attribute name="SecurityConfig">
          <security>
             <role name="guest" read="true" write="true"/>
@@ -76,36 +81,42 @@
       name="jboss.messaging.destination:service=Queue,name=A"
       xmbean-dd="xmdesc/Queue-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=QueuePostOffice</depends>
    </mbean>
 
    <mbean code="org.jboss.jms.server.destination.QueueService"
       name="jboss.messaging.destination:service=Queue,name=B"
       xmbean-dd="xmdesc/Queue-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=QueuePostOffice</depends>
    </mbean>
 
    <mbean code="org.jboss.jms.server.destination.QueueService"
       name="jboss.messaging.destination:service=Queue,name=C"
       xmbean-dd="xmdesc/Queue-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=QueuePostOffice</depends>
    </mbean>
 
    <mbean code="org.jboss.jms.server.destination.QueueService"
       name="jboss.messaging.destination:service=Queue,name=D"
       xmbean-dd="xmdesc/Queue-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=QueuePostOffice</depends>
    </mbean>
 
    <mbean code="org.jboss.jms.server.destination.QueueService"
       name="jboss.messaging.destination:service=Queue,name=ex"
       xmbean-dd="xmdesc/Queue-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=QueuePostOffice</depends>
    </mbean>
    
    <mbean code="org.jboss.jms.server.destination.QueueService"
       name="jboss.messaging.destination:service=Queue,name=openTopic"
       xmbean-dd="xmdesc/Topic-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=QueuePostOffice</depends>
       <attribute name="SecurityConfig">
          <security>
             <role name="guest" read="true" write="true" create="true"/>
@@ -119,6 +130,7 @@
       name="jboss.messaging.destination:service=Queue,name=ClusteredQueue1"
       xmbean-dd="xmdesc/Queue-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=QueuePostOffice</depends>
       <attribute name="Clustered">true</attribute>
    </mbean>   
    
@@ -126,6 +138,7 @@
       name="jboss.messaging.destination:service=Topic,name=ClusteredTopic1"
       xmbean-dd="xmdesc/Topic-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=TopicPostOffice</depends>
       <attribute name="Clustered">true</attribute>
    </mbean>   
 

Modified: trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml	2006-09-26 14:49:27 UTC (rev 1363)
@@ -21,7 +21,7 @@
       <attribute name="UsingBatchUpdates">true</attribute>
    </mbean>
 
-   <mbean code="org.jboss.messaging.core.plugin.SimplePostOfficeService"
+   <mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
       name="jboss.messaging:service=QueuePostOffice"
       xmbean-dd="xmdesc/SimplePostOffice-xmbean.xml"> 
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends> 
@@ -32,7 +32,7 @@
       <attribute name="CreateTablesOnStartup">true</attribute>
    </mbean>
    
-   <mbean code="org.jboss.messaging.core.plugin.SimplePostOfficeService"
+   <mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
       name="jboss.messaging:service=TopicPostOffice"
       xmbean-dd="xmdesc/SimplePostOffice-xmbean.xml"> 
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
@@ -45,7 +45,7 @@
    
    <!--
    
-   Uncomment this and comment out the one above to enable clustered topics
+   Uncomment this to enable clustered destinations
    
    <mbean code="org.jboss.messaging.core.plugin.ClusteredTopicPostOfficeService"
       name="jboss.messaging:service=TopicPostOffice"
@@ -55,64 +55,48 @@
       <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
       <attribute name="PostOfficeName">Clustered Topic</attribute>
       <attribute name="DataSource">java:/DefaultDS</attribute>
-      <attribute name="CreateTablesOnStartup">true</attribute>      
+      <attribute name="CreateTablesOnStartup">true</attribute>            
+      <attribute name="SqlProperties"><![CDATA[
       <attribute name="GroupName">cluster1</attribute>
       <attribute name="StateTimeout">5000</attribute>
       <attribute name="CastTimeout">5000</attribute>
       <attribute name="PullSize">1</attribute>
       <attribute name="StatsSendPeriod">1000</attribute>
       <attribute name="SyncChannelConfig">
-         <UDP mcast_addr="228.8.8.8" mcast_port="45568"
-              ip_ttl="8" ip_mcast="true"
-              mcast_send_buf_size="800000" mcast_recv_buf_size="150000"
-              ucast_send_buf_size="800000" ucast_recv_buf_size="150000"
-              loopback="false"/>
-         <PING timeout="2000" num_initial_members="3"
-              up_thread="true" down_thread="true"/>
-         <MERGE2 min_interval="10000" max_interval="20000"/>
-         <FD shun="true" up_thread="true" down_thread="true"
-              timeout="2500" max_tries="5"/>
-         <VERIFY_SUSPECT timeout="3000" num_msgs="3"
-              up_thread="true" down_thread="true"/>
-         <pbcast.NAKACK gc_lag="50" retransmit_timeout="300,600,1200,2400,4800"
-              max_xmit_size="8192"
-              up_thread="true" down_thread="true"/>
-         <pbcast.STABLE desired_avg_gossip="20000"
-              up_thread="true" down_thread="true"/>
-         <FRAG frag_size="8192"
-              down_thread="true" up_thread="true"/>
-         <pbcast.GMS join_timeout="5000" join_retry_timeout="2000"
-              shun="true" print_local_addr="true"/>
-         <pbcast.STATE_TRANSFER up_thread="true" down_thread="true"/>
+         <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
+        mcast_port="45566" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
+        mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="false" ucast_send_buf_size="32000" ip_ttl="32"/>
+         <AUTOCONF down_thread="false" up_thread="false"/>
+	      <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
+	      <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+	      <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+	      <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+	      <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
+	                   retransmit_timeout="100,200,600,1200,2400,4800"/>
+	      <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
+	      <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
+	      <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
+	      <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
+	      <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
       </attribute>      
       
-      TODO
-      Do we need failure detection and group management in this stack ?? 
-      
       <attribute name="AsyncChannelConfig">
-         <UDP mcast_addr="${jboss.partition.udpGroup:228.1.2.3}" mcast_port="45566"
-               ip_ttl="8" ip_mcast="true"
-               mcast_send_buf_size="800000" mcast_recv_buf_size="150000"
-               ucast_send_buf_size="800000" ucast_recv_buf_size="150000"
-               loopback="false"/>
-        <PING timeout="2000" num_initial_members="3"
-           up_thread="true" down_thread="true"/>
-        <MERGE2 min_interval="10000" max_interval="20000"/>
-        <FD shun="true" up_thread="true" down_thread="true"
-           timeout="2500" max_tries="5"/>
-        <VERIFY_SUSPECT timeout="3000" num_msgs="3"
-           up_thread="true" down_thread="true"/>
-        <pbcast.NAKACK gc_lag="50" retransmit_timeout="300,600,1200,2400,4800"
-           max_xmit_size="8192"
-           up_thread="true" down_thread="true"/>
-        <UNICAST timeout="300,600,1200,2400,4800" window_size="100" min_threshold="10"
-           down_thread="true"/>
-        <pbcast.STABLE desired_avg_gossip="20000"
-           up_thread="true" down_thread="true"/>
-        <FRAG frag_size="8192"
-           down_thread="true" up_thread="true"/>
-        <pbcast.GMS join_timeout="5000" join_retry_timeout="2000"
-           shun="true" print_local_addr="true"/>
+         <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
+        mcast_port="45568" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
+        mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="false" ucast_send_buf_size="32000" ip_ttl="32"/>
+	      <AUTOCONF down_thread="false" up_thread="false"/>
+	      <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
+	      <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+	      <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+	      <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+	      <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
+	                   retransmit_timeout="100,200,600,1200,2400,4800"/>
+	      <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
+	      <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
+	      <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
+	      <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
+	      <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
+	      <pbcast.STATE_TRANSFER down_thread="false" up_thread="false"/>
       </attribute>      
    </mbean>
    

Modified: trunk/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/messaging-service.xml	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/etc/server/default/deploy/messaging-service.xml	2006-09-26 14:49:27 UTC (rev 1363)
@@ -21,13 +21,13 @@
          <arg type="java.lang.String" value="/topic" />
       </constructor>
 
-      <depends optional-attribute-name="PersistenceManager">jboss.messaging:service=PersistenceManager</depends>
-      <depends optional-attribute-name="QueuePostOffice">jboss.messaging:service=QueuePostOffice</depends>
-      <depends optional-attribute-name="TopicPostOffice">jboss.messaging:service=TopicPostOffice</depends>     
+      <depends optional-attribute-name="PersistenceManager">jboss.messaging:service=PersistenceManager</depends>    
       <depends optional-attribute-name="JMSUserManager">jboss.messaging:service=JMSUserManager</depends>
 	   <depends optional-attribute-name="ShutdownLogger">jboss.messaging:service=ShutdownLogger</depends>
 
-      <!-- Set to -1 to completely disable client leasing -->
+	   <attribute name="QueuePostOffice">jboss.messaging:service=QueuePostOffice</attribute>
+	   <attribute name="TopicPostOffice">jboss.messaging:service=TopicPostOffice</attribute>
+
       <attribute name="SecurityDomain">java:/jaas/messaging</attribute>
       <attribute name="DefaultSecurityConfig">
         <security>

Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2006-09-26 14:49:27 UTC (rev 1363)
@@ -1,14 +1,14 @@
 <?xml version="1.0" encoding="UTF-8"?>
 
 <!--
-     MySQl persistence deployment descriptor.
+     MySql persistence deployment descriptor.
 
      $Id$
  -->
 
 <server>
 
-   <mbean code="org.jboss.messaging.core.plugin.JDBCPersistenceManager"
+   <mbean code="org.jboss.messaging.core.plugin.JDBCPersistenceManagerService"
       name="jboss.messaging:service=PersistenceManager"
       xmbean-dd="xmdesc/JDBCPersistenceManager-xmbean.xml">      
       <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
@@ -17,111 +17,235 @@
       <attribute name="CreateTablesOnStartup">true</attribute>
       <attribute name="UsingBatchUpdates">true</attribute>      
       <attribute name="SqlProperties"><![CDATA[
-CREATE_MESSAGE_REF=CREATE TABLE JMS_MESSAGE_REFERENCE (CHANNELID BIGINT, MESSAGEID BIGINT, TRANSACTIONID BIGINT, STATE CHAR(1), ORD BIGINT, DELIVERYCOUNT INTEGER, RELIABLE CHAR(1), LOADED CHAR(1), PRIMARY KEY(CHANNELID, MESSAGEID)) 
+CREATE_MESSAGE_REFERENCE=CREATE TABLE JMS_MESSAGE_REFERENCE (CHANNELID BIGINT, MESSAGEID BIGINT, TRANSACTIONID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, DELIVERYCOUNT INTEGER, RELIABLE CHAR(1), LOADED CHAR(1), PRIMARY KEY(CHANNELID, MESSAGEID)) 
 CREATE_IDX_MESSAGE_REF_TX=CREATE INDEX JMS_MESSAGE_REF_TX ON JMS_MESSAGE_REFERENCE (TRANSACTIONID)         
 CREATE_IDX_MESSAGE_REF_ORD=CREATE INDEX JMS_MESSAGE_REF_ORD ON JMS_MESSAGE_REFERENCE (ORD)
+CREATE_IDX_MESSAGE_REF_PAGE_ORD=CREATE INDEX JMS_MESSAGE_REF_LOADED ON JMS_MESSAGE_REFERENCE (PAGE_ORD)
 CREATE_IDX_MESSAGE_REF_MESSAGEID=CREATE INDEX JMS_MESSAGE_REF_MESSAGEID ON JMS_MESSAGE_REFERENCE (MESSAGEID)
-CREATE_IDX_MESSAGE_REF_LOADED=CREATE INDEX JMS_MESSAGE_REF_LOADED ON JMS_MESSAGE_REFERENCE (LOADED)
 CREATE_IDX_MESSAGE_REF_RELIABLE=CREATE INDEX JMS_MESSAGE_REF_RELIABLE ON JMS_MESSAGE_REFERENCE (RELIABLE)
-INSERT_MESSAGE_REF=INSERT INTO JMS_MESSAGE_REFERENCE (CHANNELID, MESSAGEID, TRANSACTIONID, STATE, ORD, DELIVERYCOUNT, RELIABLE, LOADED) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+CREATE_MESSAGE=CREATE TABLE JMS_MESSAGE (MESSAGEID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, COREHEADERS MEDIUMBLOB, PAYLOAD LONGBLOB, CHANNELCOUNT INTEGER, TYPE TINYINT, JMSTYPE VARCHAR(255), CORRELATIONID VARCHAR(255), CORRELATIONID_BYTES VARBINARY(254), DESTINATION VARCHAR(255), REPLYTO VARCHAR(255), JMSPROPERTIES MEDIUMBLOB, PRIMARY KEY (MESSAGEID))
+CREATE_TRANSACTION=CREATE TABLE JMS_TRANSACTION (TRANSACTIONID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTIONID))
+CREATE_COUNTER=CREATE TABLE JMS_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))
+INSERT_MESSAGE_REF=INSERT INTO JMS_MESSAGE_REFERENCE (CHANNELID, MESSAGEID, TRANSACTIONID, STATE, ORD, PAGE_ORD, DELIVERYCOUNT, RELIABLE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_MESSAGE_REF=DELETE FROM JMS_MESSAGE_REFERENCE WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'
 UPDATE_MESSAGE_REF=UPDATE JMS_MESSAGE_REFERENCE SET TRANSACTIONID=?, STATE='-' WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'
-UPDATE_MESSAGE_REF_NOT_LOADED=UPDATE JMS_MESSAGE_REFERENCE SET LOADED='N' WHERE MESSAGEID=? AND CHANNELID=?
+UPDATE_PAGE_ORDER=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = ? WHERE MESSAGEID=? AND CHANNELID=?
 COMMIT_MESSAGE_REF1=UPDATE JMS_MESSAGE_REFERENCE SET STATE='C', TRANSACTIONID = NULL WHERE TRANSACTIONID=? AND STATE='+'
 COMMIT_MESSAGE_REF2=DELETE FROM JMS_MESSAGE_REFERENCE WHERE TRANSACTIONID=? AND STATE='-'
 ROLLBACK_MESSAGE_REF1=DELETE FROM JMS_MESSAGE_REFERENCE WHERE TRANSACTIONID=? AND STATE='+'
 ROLLBACK_MESSAGE_REF2=UPDATE JMS_MESSAGE_REFERENCE SET STATE='C', TRANSACTIONID = NULL WHERE TRANSACTIONID=? AND STATE='-'
-LOAD_REF_INFO=SELECT MESSAGEID, ORD, DELIVERYCOUNT, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID=? AND STATE <> '+' AND LOADED = 'N' AND ORD BETWEEN ? AND ? ORDER BY ORD
-SELECT_COUNT_REFS=SELECT COUNT(MESSAGEID) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID=? AND STATE <> '+' AND LOADED='N'      
-UPDATE_RELIABLE_REFS=UPDATE JMS_MESSAGE_REFERENCE SET LOADED='Y' WHERE ORD BETWEEN ? AND ? AND CHANNELID=? AND RELIABLE='Y' AND STATE <> '+'      
-UPDATE_RELIABLE_REFS_NOT_LOADED=UPDATE JMS_MESSAGE_REFERENCE SET LOADED='N' WHERE CHANNELID=?  
-SELECT_MIN_ORDERING=SELECT MIN(ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID=? AND STATE <> '+' AND LOADED = 'N'          
-DELETE_UNRELIABLE_REFS=DELETE FROM JMS_MESSAGE_REFERENCE WHERE RELIABLE = 'N'
-CREATE_MESSAGE=CREATE TABLE JMS_MESSAGE (MESSAGEID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, COREHEADERS MEDIUMBLOB, PAYLOAD LONGBLOB, CHANNELCOUNT INTEGER, TYPE TINYINT, JMSTYPE VARCHAR(255), CORRELATIONID VARCHAR(255), CORRELATIONID_BYTES VARBINARY(254), DESTINATION_ID BIGINT, REPLYTO_ID BIGINT, JMSPROPERTIES MEDIUMBLOB, PRIMARY KEY (MESSAGEID))
-LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES FROM JMS_MESSAGE
-INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+LOAD_PAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, PAGE_ORD, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
+LOAD_UNPAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE PAGE_ORD IS NULL and CHANNELID = ? ORDER BY ORD
+UPDATE_RELIABLE_REFS_NOT_PAGED=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=? 
+SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?
+SELECT_EXISTS_REF=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?");
+LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES FROM JMS_MESSAGE
+INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 UPDATE_MESSAGE_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT=? WHERE MESSAGEID=?
 DELETE_MESSAGE=DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?
-MESSAGEID_COLUMN=MESSAGEID    
-UPDATE_UNRELIABLE_CHANNELCOUNT=UPDATE JMS_MESSAGE M SET M.CHANNELCOUNT = M.CHANNELCOUNT - 1 WHERE M.MESSAGEID IN (SELECT MR.MESSAGEID FROM JMS_MESSAGE_REFERENCE MR WHERE MR.RELIABLE = 'N' AND MR.CHANNELID = ?)
-DELETE_UNREFFED_MESSAGES=DELETE FROM JMS_MESSAGE WHERE CHANNELCOUNT = 0    
-CREATE_TRANSACTION=CREATE TABLE JMS_TRANSACTION (TRANSACTIONID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTIONID))
+MESSAGEID_COLUMN=MESSAGEID
 INSERT_TRANSACTION=INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?)
 DELETE_TRANSACTION=DELETE FROM JMS_TRANSACTION WHERE TRANSACTIONID = ?
-SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JMS_TRANSACTION         
-DELETE_ALL_TRANSACTIONS=DELETE FROM JMS_TRANSACTION
-CREATE_COUNTER=CREATE TABLE JMS_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))
+SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JMS_TRANSACTION
 UPDATE_COUNTER=UPDATE JMS_COUNTER SET NEXT_ID = ? WHERE NAME=?
-SELECT_COUNTER=SELECT NEXT_ID FROM JMS_COUNTER WHERE NAME=?
+SELECT_COUNTER=SELECT NEXT_ID FROM JMS_COUNTER WHERE NAME=? FOR UPDATE
 INSERT_COUNTER=INSERT INTO JMS_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
-DELETE_ALL_COUNTERS=DELETE FROM JMS_COUNTER
 SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNELID) FROM JMS_MESSAGE_REFERENCE
       ]]></attribute>
       <attribute name="MaxParams">500</attribute>
    </mbean>
       
-   <mbean code="org.jboss.messaging.core.plugin.DirectExchange"
-      name="jboss.messaging:service=DirectExchange"
-      xmbean-dd="xmdesc/Exchange-xmbean.xml">
+   <!--         
+      
+   <mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
+      name="jboss.messaging:service=QueuePostOffice"
+      xmbean-dd="xmdesc/SimplePostOffice-xmbean.xml"> 
+      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends> 
       <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
       <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+      <attribute name="PostOfficeName">Queue</attribute>
       <attribute name="DataSource">java:/DefaultDS</attribute>
       <attribute name="CreateTablesOnStartup">true</attribute>
       <attribute name="SqlProperties"><![CDATA[
-CREATE_USER_TABLE=CREATE TABLE JMS_USER (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128), PRIMARY KEY(USERID))
-CREATE_ROLE_TABLE=CREATE TABLE JMS_ROLE (ROLEID VARCHAR(32) NOT NULL, USERID VARCHAR(32) NOT NULL, PRIMARY KEY(USERID, ROLEID))      
-SELECT_PRECONF_CLIENTID=SELECT CLIENTID FROM JMS_USER WHERE USERID=?
-CREATE_MAPPING_TABLE=CREATE TABLE JMS_CHANNEL_MAPPING (ID BIGINT, TYPE CHAR(1), JMS_DEST_NAME VARCHAR(1024), JMS_SUB_NAME VARCHAR(1024), CLIENT_ID VARCHAR(128), SELECTOR VARCHAR(1024), NO_LOCAL CHAR(1), PRIMARY KEY(ID))      
-INSERT_MAPPING=INSERT INTO JMS_CHANNEL_MAPPING (ID, TYPE, JMS_DEST_NAME, JMS_SUB_NAME, CLIENT_ID, SELECTOR, NO_LOCAL) VALUES (?, ?, ?, ?, ?, ?, ?)
-DELETE_MAPPING=DELETE FROM JMS_CHANNEL_MAPPING WHERE ID = ?
-SELECT_ID_FOR_DESTINATION=SELECT ID FROM JMS_CHANNEL_MAPPING WHERE TYPE=? AND JMS_DEST_NAME=?
-SELECT_DURABLE_SUB=SELECT JMS_DEST_NAME, ID, SELECTOR, NO_LOCAL FROM JMS_CHANNEL_MAPPING WHERE CLIENT_ID=? AND JMS_SUB_NAME=?            
-SELECT_SUBSCRIPTIONS_FOR_TOPIC=SELECT ID, CLIENT_ID, JMS_SUB_NAME, SELECTOR, NO_LOCAL FROM JMS_CHANNEL_MAPPING WHERE TYPE='D' AND JMS_DEST_NAME=?      
-      ]]></attribute>
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), CONDITION VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME  = ?
+		]]></attribute>
    </mbean>
    
-   <mbean code="org.jboss.messaging.core.plugin.TopicExchange"
-      name="jboss.messaging:service=TopicExchange"
-      xmbean-dd="xmdesc/Exchange-xmbean.xml">
+   <mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
+      name="jboss.messaging:service=TopicPostOffice"
+      xmbean-dd="xmdesc/SimplePostOffice-xmbean.xml"> 
+      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
       <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
       <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+      <attribute name="PostOfficeName">Topic</attribute>
       <attribute name="DataSource">java:/DefaultDS</attribute>
       <attribute name="CreateTablesOnStartup">true</attribute>
       <attribute name="SqlProperties"><![CDATA[
-CREATE_USER_TABLE=CREATE TABLE JMS_USER (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128), PRIMARY KEY(USERID))
-CREATE_ROLE_TABLE=CREATE TABLE JMS_ROLE (ROLEID VARCHAR(32) NOT NULL, USERID VARCHAR(32) NOT NULL, PRIMARY KEY(USERID, ROLEID))      
-SELECT_PRECONF_CLIENTID=SELECT CLIENTID FROM JMS_USER WHERE USERID=?
-CREATE_MAPPING_TABLE=CREATE TABLE JMS_CHANNEL_MAPPING (ID BIGINT, TYPE CHAR(1), JMS_DEST_NAME VARCHAR(1024), JMS_SUB_NAME VARCHAR(1024), CLIENT_ID VARCHAR(128), SELECTOR VARCHAR(1024), NO_LOCAL CHAR(1), PRIMARY KEY(ID))      
-INSERT_MAPPING=INSERT INTO JMS_CHANNEL_MAPPING (ID, TYPE, JMS_DEST_NAME, JMS_SUB_NAME, CLIENT_ID, SELECTOR, NO_LOCAL) VALUES (?, ?, ?, ?, ?, ?, ?)
-DELETE_MAPPING=DELETE FROM JMS_CHANNEL_MAPPING WHERE ID = ?
-SELECT_ID_FOR_DESTINATION=SELECT ID FROM JMS_CHANNEL_MAPPING WHERE TYPE=? AND JMS_DEST_NAME=?
-SELECT_DURABLE_SUB=SELECT JMS_DEST_NAME, ID, SELECTOR, NO_LOCAL FROM JMS_CHANNEL_MAPPING WHERE CLIENT_ID=? AND JMS_SUB_NAME=?            
-SELECT_SUBSCRIPTIONS_FOR_TOPIC=SELECT ID, CLIENT_ID, JMS_SUB_NAME, SELECTOR, NO_LOCAL FROM JMS_CHANNEL_MAPPING WHERE TYPE='D' AND JMS_DEST_NAME=?      
-      ]]></attribute>
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), CONDITION VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME  = ?
+		]]></attribute>      
+   </mbean> 
+   
+   -->
+   
+   <mbean code="org.jboss.messaging.core.plugin.ClusteredPostOfficeService"
+      name="jboss.messaging:service=QueuePostOffice"
+      xmbean-dd="xmdesc/ClusteredPostOffice-xmbean.xml"> 
+      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
+      <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+      <attribute name="PostOfficeName">Clustered Queue</attribute>
+      <attribute name="DataSource">java:/DefaultDS</attribute>
+      <attribute name="CreateTablesOnStartup">true</attribute>            
+      <attribute name="SqlProperties"><![CDATA[
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME  = ?
+		]]></attribute>
+      <attribute name="GroupName">cluster1</attribute>
+      <attribute name="StateTimeout">5000</attribute>
+      <attribute name="CastTimeout">5000</attribute>
+      <attribute name="PullSize">1</attribute>
+      <attribute name="StatsSendPeriod">1000</attribute>
+      
+		<attribute name="AsyncChannelConfig">
+         <config>
+	         <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
+	        mcast_port="45568" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
+	        mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="false" ucast_send_buf_size="32000" ip_ttl="32"/>
+		      <AUTOCONF down_thread="false" up_thread="false"/>
+		      <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
+		      <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+		      <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+		      <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+		      <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
+		                   retransmit_timeout="100,200,600,1200,2400,4800"/>
+		      <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
+		      <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
+		      <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
+		      <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
+		      <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>		      
+	      </config>
+      </attribute>      
+      
+      <attribute name="SyncChannelConfig">
+         <config>
+	         <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
+	        mcast_port="45567" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
+	        mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="false" ucast_send_buf_size="32000" ip_ttl="32"/>
+	         <AUTOCONF down_thread="false" up_thread="false"/>
+		      <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
+		      <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+		      <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+		      <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+		      <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
+		                   retransmit_timeout="100,200,600,1200,2400,4800"/>
+		      <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
+		      <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
+		      <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
+		      <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
+		      <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
+		      <pbcast.STATE_TRANSFER down_thread="false" up_thread="false"/>
+	      </config>   
+      </attribute>                        
    </mbean>   
    
-   <mbean code="org.jboss.jms.server.plugin.JDBCJMSUserManager"
+   <mbean code="org.jboss.messaging.core.plugin.ClusteredPostOfficeService"
+      name="jboss.messaging:service=TopicPostOffice"
+      xmbean-dd="xmdesc/ClusteredPostOffice-xmbean.xml"> 
+      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
+      <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+      <attribute name="PostOfficeName">Clustered Topic</attribute>
+      <attribute name="DataSource">java:/DefaultDS</attribute>
+      <attribute name="CreateTablesOnStartup">true</attribute>            
+      <attribute name="SqlProperties"><![CDATA[
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME  = ?
+		]]></attribute>
+      <attribute name="GroupName">cluster1</attribute>
+      <attribute name="StateTimeout">5000</attribute>
+      <attribute name="CastTimeout">5000</attribute>
+      <attribute name="PullSize">1</attribute>
+      <attribute name="StatsSendPeriod">1000</attribute>
+      
+      <attribute name="AsyncChannelConfig">
+         <config>
+	         <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
+	        mcast_port="45568" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
+	        mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="false" ucast_send_buf_size="32000" ip_ttl="32"/>
+		      <AUTOCONF down_thread="false" up_thread="false"/>
+		      <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
+		      <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+		      <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+		      <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+		      <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
+		                   retransmit_timeout="100,200,600,1200,2400,4800"/>
+		      <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
+		      <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
+		      <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
+		      <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
+		      <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>		      
+	      </config>
+      </attribute>      
+      
+      <attribute name="SyncChannelConfig">
+         <config>
+	         <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
+	        mcast_port="45569" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
+	        mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="false" ucast_send_buf_size="32000" ip_ttl="32"/>
+	         <AUTOCONF down_thread="false" up_thread="false"/>
+		      <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
+		      <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+		      <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+		      <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+		      <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
+		                   retransmit_timeout="100,200,600,1200,2400,4800"/>
+		      <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
+		      <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
+		      <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
+		      <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
+		      <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
+		      <pbcast.STATE_TRANSFER down_thread="false" up_thread="false"/>
+	      </config>   
+      </attribute>       
+   </mbean>
+
+   <mbean code="org.jboss.jms.server.plugin.JDBCJMSUserManagerService"
       name="jboss.messaging:service=JMSUserManager"
       xmbean-dd="xmdesc/JMSUserManager-xmbean.xml">
-      <!-- TODO this insures the fact that dependency exists. However I need to redundantly specifiy
-           the DataSource JNDI name in order to actually get a reference to it. Fix this.
-      -->   
       <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
       <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
       <attribute name="DataSource">java:/DefaultDS</attribute>
-      <attribute name="CreateTablesOnStartup">true</attribute>
-   </mbean> 
+      <attribute name="CreateTablesOnStartup">true</attribute>      
+      <attribute name="SqlProperties"><![CDATA[
+CREATE_USER_TABLE=CREATE TABLE JMS_USER (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128), PRIMARY KEY(USERID))
+CREATE_ROLE_TABLE=CREATE TABLE JMS_ROLE (ROLEID VARCHAR(32) NOT NULL, USERID VARCHAR(32) NOT NULL, PRIMARY KEY(USERID, ROLEID))
+SELECT_PRECONF_CLIENTID=SELECT CLIENTID FROM JMS_USER WHERE USERID=?
+      ]]></attribute>  
+   </mbean>    
    
-   <mbean code="org.jboss.jms.server.plugin.JDBCShutdownLogger"
+	<mbean code="org.jboss.messaging.core.plugin.JDBCShutdownLoggerService"
       name="jboss.messaging:service=ShutdownLogger"
       xmbean-dd="xmdesc/JDBCShutdownLogger-xmbean.xml">
-      <!-- TODO this insures the fact that dependency exists. However I need to redundantly specifiy
-           the DataSource JNDI name in order to actually get a reference to it. Fix this.
-      -->   
       <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
       <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
       <attribute name="DataSource">java:/DefaultDS</attribute>
       <attribute name="CreateTablesOnStartup">true</attribute>
-   </mbean>       
+      <attribute name="SqlProperties"><![CDATA[
+      CREATE_STARTUP=CREATE TABLE JMS_STARTUP (NODE_ID VARCHAR(255) PRIMARY KEY)      
+      SELECT_STARTUP=SELECT NODE_ID FROM JMS_STARTUP WHERE NODE_ID = ?
+      DELETE_STARTUP=DELETE FROM JMS_STARTUP WHERE NODE_ID = ?
+      INSERT_STARTUP=INSERT INTO JMS_STARTUP (NODE_ID) VALUES (?)
+      ]]></attribute>       
+   </mbean>  
       
 </server>
\ No newline at end of file

Modified: trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2006-09-26 14:49:27 UTC (rev 1363)
@@ -41,6 +41,18 @@
       <type>boolean</type>
    </attribute>
    
+   <attribute access="read-write" getMethod="getPostOfficeName" setMethod="setPostOfficeName">
+      <description>The name of the post office</description>
+      <name>PostOfficeName</name>
+      <type>java.lang.String</type>
+   </attribute> 
+   
+   <attribute access="read-write" getMethod="getServerPeer" setMethod="setServerPeer">
+      <description>The ObjectName of the server peer this destination was deployed on</description>
+      <name>ServerPeer</name>
+      <type>javax.management.ObjectName</type>
+   </attribute> 
+   
    <attribute access="read-write" getMethod="getGroupName" setMethod="setGroupName">
       <description>The name of the JGroups group to use</description>
       <name>GroupName</name>
@@ -55,19 +67,19 @@
    
    <attribute access="read-write" getMethod="getCastTimeout" setMethod="setCastTimeout">
       <description>Timeout for getState()</description>
-      <name>CastTimeout/name>
+      <name>CastTimeout</name>
       <type>long</type>
    </attribute>   
    
    <attribute access="read-write" getMethod="getPullSize" setMethod="setPullSize">
       <description>The maximum number of message to pull in one go from a remote queue when the local queue consumers are starving</description>
-      <name>SyncPullSize</name>
+      <name>PullSize</name>
       <type>int</type>
    </attribute> 
    
    <attribute access="read-write" getMethod="getStatsSendPeriod" setMethod="setStatsSendPeriod">
       <description>The period in milliseconds between a post office casting it's statistics across the cluster</description>
-      <name>StatsSendPeriod/name>
+      <name>StatsSendPeriod</name>
       <type>long</type>
    </attribute>    
    

Copied: trunk/src/etc/xmdesc/DefaultPostOffice-xmbean.xml (from rev 1298, trunk/src/etc/xmdesc/SimplePostOffice-xmbean.xml)
===================================================================
--- trunk/src/etc/xmdesc/SimplePostOffice-xmbean.xml	2006-09-17 17:58:08 UTC (rev 1298)
+++ trunk/src/etc/xmdesc/DefaultPostOffice-xmbean.xml	2006-09-26 14:49:27 UTC (rev 1363)
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+   <!DOCTYPE mbean PUBLIC
+      "-//JBoss//DTD JBOSS XMBEAN 1.2//EN"
+      "http://www.jboss.org/j2ee/dtd/jboss_xmbean_1_2.dtd">
+
+<mbean>
+   <description>An simple non-clustered post-office</description>
+   <class>org.jboss.messaging.core.plugin.DefaultPostOfficeService</class>
+
+   <!-- Managed constructors -->
+
+   <!-- Managed attributes -->
+
+   <attribute access="read-only" getMethod="getInstance">
+      <description>The instance to plug into the server peer</description>
+      <name>Instance</name>
+      <type>org.jboss.messaging.core.plugin.contract.MessagingComponent</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="getDataSource" setMethod="setDataSource">
+      <description>The JNDI name of the DataSource used by this ChannelMapper instance</description>
+      <name>DataSource</name>
+      <type>java.lang.String</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="getTransactionManager" setMethod="setTransactionManager">
+      <description>The ObjectName of the TransactionManager used by this ChannelMaper instance</description>
+      <name>TransactionManager</name>
+      <type>javax.management.ObjectName</type>
+   </attribute>
+   
+   <attribute access="read-write" getMethod="getSqlProperties" setMethod="setSqlProperties">
+      <description>DML and DDL overrides</description>
+      <name>SqlProperties</name>
+      <type>java.lang.String</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="isCreateTablesOnStartup" setMethod="setCreateTablesOnStartup">
+      <description>Should database tables be created on startup?</description>
+      <name>CreateTablesOnStartup</name>
+      <type>boolean</type>
+   </attribute>
+   
+   <attribute access="read-write" getMethod="getPostOfficeName" setMethod="setPostOfficeName">
+      <description>The name of the post office</description>
+      <name>PostOfficeName</name>
+      <type>java.lang.String</type>
+   </attribute> 
+   
+   <attribute access="read-write" getMethod="getServerPeer" setMethod="setServerPeer">
+      <description>The ObjectName of the server peer this destination was deployed on</description>
+      <name>ServerPeer</name>
+      <type>javax.management.ObjectName</type>
+   </attribute>    
+
+   <!-- Managed operations -->
+
+   <operation>
+      <description>JBoss Service lifecycle operation</description>
+      <name>create</name>
+   </operation>
+
+   <operation>
+      <description>JBoss Service lifecycle operation</description>
+      <name>start</name>
+   </operation>
+
+   <operation>
+      <description>JBoss Service lifecycle operation</description>
+      <name>stop</name>
+   </operation>
+
+   <operation>
+      <description>JBoss Service lifecycle operation</description>
+      <name>destroy</name>
+   </operation>
+     
+</mbean>
\ No newline at end of file

Deleted: trunk/src/etc/xmdesc/SimplePostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/SimplePostOffice-xmbean.xml	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/etc/xmdesc/SimplePostOffice-xmbean.xml	2006-09-26 14:49:27 UTC (rev 1363)
@@ -1,84 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-   <!DOCTYPE mbean PUBLIC
-      "-//JBoss//DTD JBOSS XMBEAN 1.2//EN"
-      "http://www.jboss.org/j2ee/dtd/jboss_xmbean_1_2.dtd">
-
-<mbean>
-   <description>An simple non-clustered post-office</description>
-   <class>org.jboss.messaging.core.plugin.SimplePostOfficeService</class>
-
-   <!-- Managed constructors -->
-
-   <!-- Managed attributes -->
-
-   <attribute access="read-only" getMethod="getInstance">
-      <description>The instance to plug into the server peer</description>
-      <name>Instance</name>
-      <type>org.jboss.messaging.core.plugin.contract.MessagingComponent</type>
-   </attribute>
-
-   <attribute access="read-write" getMethod="getDataSource" setMethod="setDataSource">
-      <description>The JNDI name of the DataSource used by this ChannelMapper instance</description>
-      <name>DataSource</name>
-      <type>java.lang.String</type>
-   </attribute>
-
-   <attribute access="read-write" getMethod="getTransactionManager" setMethod="setTransactionManager">
-      <description>The ObjectName of the TransactionManager used by this ChannelMaper instance</description>
-      <name>TransactionManager</name>
-      <type>javax.management.ObjectName</type>
-   </attribute>
-   
-   <attribute access="read-write" getMethod="getSqlProperties" setMethod="setSqlProperties">
-      <description>DML and DDL overrides</description>
-      <name>SqlProperties</name>
-      <type>java.lang.String</type>
-   </attribute>
-
-   <attribute access="read-write" getMethod="isCreateTablesOnStartup" setMethod="setCreateTablesOnStartup">
-      <description>Should database tables be created on startup?</description>
-      <name>CreateTablesOnStartup</name>
-      <type>boolean</type>
-   </attribute>
-   
-   <attribute access="read-write" getMethod="getPostOfficeName" setMethod="setPostOfficeName">
-      <description>The name of the post office</description>
-      <name>PostOfficeName</name>
-      <type>java.lang.String</type>
-   </attribute> 
-   
-   <attribute access="read-write" getMethod="getServerPeer" setMethod="setServerPeer">
-      <description>The ObjectName of the server peer this destination was deployed on</description>
-      <name>ServerPeer</name>
-      <type>javax.management.ObjectName</type>
-   </attribute>   
-   
-   <attribute access="read-write" getMethod="getServerPeer" setMethod="setServerPeer">
-      <description>The ObjectName of the server peer this destination was deployed on</description>
-      <name>ServerPeer</name>
-      <type>javax.management.ObjectName</type>
-   </attribute>  
-
-   <!-- Managed operations -->
-
-   <operation>
-      <description>JBoss Service lifecycle operation</description>
-      <name>create</name>
-   </operation>
-
-   <operation>
-      <description>JBoss Service lifecycle operation</description>
-      <name>start</name>
-   </operation>
-
-   <operation>
-      <description>JBoss Service lifecycle operation</description>
-      <name>stop</name>
-   </operation>
-
-   <operation>
-      <description>JBoss Service lifecycle operation</description>
-      <name>destroy</name>
-   </operation>
-     
-</mbean>
\ No newline at end of file

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -177,6 +177,8 @@
          Map configuration = new HashMap();
 
          configuration.put(Client.ENABLE_LEASE, String.valueOf(false));
+         
+         log.info("*********** SERVERLOCATOR URI:" + serverLocatorURI);
 
          client = new Client(new InvokerLocator(serverLocatorURI), configuration);     
          

Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -139,7 +139,10 @@
 
       if (params != null)
       {
-         serializationType = (String)params.get("serializationtype");
+         //serializationType = (String)params.get("serializationtype");
+         
+         //Always use jms
+         serializationType = "jms";
       }
             
       while (!completed && count < MAX_RETRIES)

Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -238,12 +238,9 @@
          txRepository.start();
          
          //Did the server crash last time?
-         crashed = shutdownLogger.startup(serverPeerID);
-                            
-         if (crashed)
-         {
-            topicPostOffice.recover();
-         }
+         
+         //TODO do we need this?
+         crashed = shutdownLogger.startup(serverPeerID);                            
 
          initializeRemoting(mbeanServer);
    

Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -84,6 +84,9 @@
          }
       
          String locatorURI = (String)server.getAttribute(connectorObjectName, "InvokerLocator");
+         
+         log.info("******* LOCATOR URI IS " + locatorURI);
+         
          ServerPeer serverPeer = (ServerPeer)server.getAttribute(serverPeerObjectName, "Instance");
          
          connectionFactoryManager = serverPeer.getConnectionFactoryManager();

Modified: trunk/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManager.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManager.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -26,8 +26,8 @@
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -67,14 +67,14 @@
    
    protected Map getDefaultDMLStatements()
    {                
-      Map map = new HashMap();
+      Map map = new LinkedHashMap();
       map.put("SELECT_PRECONF_CLIENTID", "SELECT CLIENTID FROM JMS_USER WHERE USERID=?");
       return map;
    }
    
    protected Map getDefaultDDLStatements()
    {
-      Map map = new HashMap();
+      Map map = new LinkedHashMap();
       map.put("CREATE_USER_TABLE",
               "CREATE TABLE JMS_USER (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128)," +
               " PRIMARY KEY(USERID))");

Copied: trunk/src/main/org/jboss/messaging/core/plugin/DefaultPostOfficeService.java (from rev 1321, trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java	2006-09-19 19:17:09 UTC (rev 1321)
+++ trunk/src/main/org/jboss/messaging/core/plugin/DefaultPostOfficeService.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -0,0 +1,171 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin;
+
+import javax.management.ObjectName;
+import javax.transaction.TransactionManager;
+
+import org.jboss.jms.selector.SelectorFactory;
+import org.jboss.jms.server.QueuedExecutorPool;
+import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.util.ExceptionUtil;
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.plugin.contract.MessageStore;
+import org.jboss.messaging.core.plugin.contract.MessagingComponent;
+import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.plugin.postoffice.DefaultPostOffice;
+import org.jboss.messaging.core.tx.TransactionRepository;
+
+/**
+ * A DefaultPostOfficeService
+ * 
+ * MBean wrapper for a simple post office
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DefaultPostOfficeService extends JDBCServiceSupport
+{
+   private DefaultPostOffice postOffice;
+   
+   private ObjectName serverPeerObjectName;
+   
+   private String officeName;
+   
+   private boolean started;
+   
+   // Constructor ----------------------------------------------------------
+   
+   public DefaultPostOfficeService()
+   {      
+   }
+   
+   // ServerPlugin implementation ------------------------------------------
+   
+   public MessagingComponent getInstance()
+   {
+      return postOffice;
+   }
+   
+   // MBean attributes -----------------------------------------------------
+   
+   public synchronized ObjectName getServerPeer()
+   {
+      return serverPeerObjectName;
+   }
+   
+   public synchronized void setServerPeer(ObjectName on)
+   {
+      if (started)
+      {
+         log.warn("Cannot set attribute when service is started");
+         return;
+      }
+      this.serverPeerObjectName = on;
+   }
+   
+   public synchronized String getPostOfficeName()
+   {
+      return officeName;
+   }
+   
+   public synchronized void setPostOfficeName(String name)
+   {
+      if (started)
+      {
+         log.warn("Cannot set attribute when service is started");
+         return;
+      }
+      this.officeName = name;
+   }
+   
+   // ServiceMBeanSupport overrides ---------------------------------
+   
+   protected synchronized void startService() throws Exception
+   {
+      if (started)
+      {
+         throw new IllegalStateException("Service is already started");
+      }
+      
+      super.startService();
+      
+      try
+      {  
+         TransactionManager tm = getTransactionManagerReference();
+         
+         ServerPeer serverPeer = (ServerPeer)server.getAttribute(serverPeerObjectName, "Instance");
+         
+         MessageStore ms = serverPeer.getMessageStore();
+         
+         PersistenceManager pm = serverPeer.getPersistenceManagerInstance();
+         
+         QueuedExecutorPool pool = serverPeer.getQueuedExecutorPool();
+         
+         TransactionRepository tr = serverPeer.getTxRepository();
+         
+         String nodeId = serverPeer.getServerPeerID();
+         
+         FilterFactory ff = new SelectorFactory();
+               
+         postOffice = new DefaultPostOffice(ds, tm, sqlProperties,
+                                         createTablesOnStartup,
+                                         nodeId, officeName, ms, pm, tr, ff, pool);
+         
+         postOffice.start();
+         
+         started = true;         
+      }
+      catch (Throwable t)
+      {
+         throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
+      } 
+   }
+   
+   protected void stopService() throws Exception
+   {
+      if (!started)
+      {
+         throw new IllegalStateException("Service is not started");
+      }
+      
+      super.stopService();
+      
+      try
+      {      
+         postOffice.stop();
+         
+         postOffice = null;
+               
+         started = false;
+                     
+         log.debug(this + " stopped");
+      }
+      catch (Throwable t)
+      {
+         throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
+      } 
+   }      
+}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -38,6 +38,7 @@
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -1793,149 +1794,6 @@
       Collections.sort(references, MessageOrderComparator.instance);
    }
    
-   /*
-    *
-    * We want to remove any non reliable refs from the database and any corresponding messages if their channel count
-    * has gone to zero
-    * 
-    * TODO
-    * Really - this method only needs to be executed on start up if the server has crashed
-    * We should save a flag in the db at server shutdown and check for this at startup to see if there
-    * was a clean shutdown
-    * 
-    */
-//   protected void removeUnreliableMessageData() throws Exception
-//   {
-//      log.trace("Removing all non-persistent data");
-//      
-//      Connection conn = null;
-//      PreparedStatement psRes = null;
-//      PreparedStatement psUpdate = null;
-//      PreparedStatement psDeleteMsgs = null;
-//      PreparedStatement psDeleteRefs = null;
-//      TransactionWrapper wrap = new TransactionWrapper();
-//          
-//      ResultSet rs = null;
-//      
-//      try
-//      {
-//         conn = ds.getConnection();
-//         
-//         psRes = conn.prepareStatement(getSQLStatement("SELECT_ALL_CHANNELS"));
-//         
-//         psUpdate = conn.prepareStatement(getSQLStatement("UPDATE_UNRELIABLE_CHANNELCOUNT"));
-//                          
-//         rs = psRes.executeQuery();
-//         
-//         while (rs.next())
-//         {
-//            long channelID = rs.getLong(1);
-//            
-//            psUpdate.setLong(1, channelID);
-//            
-//            int rows = psUpdate.executeUpdate();
-//            
-//            if (trace)
-//            {
-//               log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_UNRELIABLE_CHANNELCOUNT"))
-//                     + " updated " + rows + " rows");
-//            }            
-//         }
-//         
-//         psDeleteRefs = conn.prepareStatement(getSQLStatement("DELETE_UNRELIABLE_REFS"));
-//         
-//         int rows = psDeleteRefs.executeUpdate();
-//         
-//         boolean doReOrder = rows > 0;
-//         
-//         if (trace)
-//         {
-//            log.trace(JDBCUtil.statementToString(getSQLStatement("DELETE_UNRELIABLE_REFS"))
-//                  + " deleted " + rows + " rows");
-//         }
-//                  
-//         psDeleteMsgs = conn.prepareStatement(getSQLStatement("DELETE_UNREFFED_MESSAGES"));
-//         
-//         rows = psDeleteMsgs.executeUpdate();
-//         
-//         if (trace)
-//         {
-//            log.trace(JDBCUtil.statementToString(getSQLStatement("DELETE_UNREFFED_MESSAGES"))
-//                  + " deleted " + rows + " rows");
-//         }     
-//      }
-//      catch (Exception e)
-//      {
-//         wrap.exceptionOccurred();
-//         throw e;
-//      }
-//      finally
-//      {
-//         if (rs != null)
-//         {
-//            try
-//            {
-//               rs.close();
-//            }
-//            catch (Throwable e)
-//            {
-//            }
-//         }
-//         if (psRes != null)
-//         {
-//            try
-//            {
-//               psRes.close();
-//            }
-//            catch (Throwable e)
-//            {
-//            }
-//         }
-//         if (psUpdate != null)
-//         {
-//            try
-//            {
-//               psUpdate.close();
-//            }
-//            catch (Throwable e)
-//            {
-//            }
-//         }
-//         if (psDeleteMsgs != null)
-//         {
-//            try
-//            {
-//               psDeleteMsgs.close();
-//            }
-//            catch (Throwable e)
-//            {
-//            }
-//         }
-//         if (psDeleteRefs != null)
-//         {
-//            try
-//            {
-//               psDeleteRefs.close();
-//            }
-//            catch (Throwable e)
-//            {
-//            }
-//         }
-//         if (conn != null)
-//         {
-//            try
-//            {
-//               conn.close();
-//            }
-//            catch (Throwable e)
-//            {
-//            }
-//         }
-//         wrap.end();
-//      }
-//   }
-   
-   
    protected void handleBeforeCommit1PC(List refsToAdd, List refsToRemove, Transaction tx)
       throws Exception
    {
@@ -3147,6 +3005,8 @@
       }
    }  
    
+   
+   
    protected void addReference(long channelID, MessageReference ref, PreparedStatement ps, boolean paged) throws Exception
    {
       if (trace)
@@ -3158,16 +3018,17 @@
       ps.setLong(2, ref.getMessageID());
       ps.setNull(3, Types.BIGINT);
       ps.setString(4, "C");
+      ps.setLong(5, getOrdering());
       if (paged)
       {
-         ps.setLong(5, ref.getPagingOrder());
+         ps.setLong(6, ref.getPagingOrder());
       }
       else
       {
-         ps.setNull(5, Types.BIGINT);
+         ps.setNull(6, Types.BIGINT);
       }
-      ps.setInt(6, ref.getDeliveryCount());
-      ps.setString(7, ref.isReliable() ? "Y" : "N");
+      ps.setInt(7, ref.getDeliveryCount());
+      ps.setString(8, ref.isReliable() ? "Y" : "N");
    }
    
    protected void removeReference(long channelID, MessageReference ref, PreparedStatement ps) throws Exception
@@ -3194,9 +3055,10 @@
       ps.setLong(2, ref.getMessageID());
       ps.setLong(3, tx.getId());
       ps.setString(4, "+");
-      ps.setNull(5, Types.BIGINT);      
-      ps.setInt(6, ref.getDeliveryCount());
-      ps.setString(7, ref.isReliable() ? "Y" : "N");
+      ps.setLong(5, getOrdering());
+      ps.setNull(6, Types.BIGINT);      
+      ps.setInt(7, ref.getDeliveryCount());
+      ps.setString(8, ref.isReliable() ? "Y" : "N");
    }
    
    protected void prepareToRemoveReference(long channelID, MessageReference ref, Transaction tx, PreparedStatement ps)
@@ -3636,12 +3498,12 @@
    
    protected Map getDefaultDDLStatements()
    {
-      Map map = new HashMap();
+      Map map = new LinkedHashMap();
       //Message reference
       map.put("CREATE_MESSAGE_REFERENCE",
               "CREATE TABLE JMS_MESSAGE_REFERENCE (CHANNELID BIGINT, " +
               "MESSAGEID BIGINT, TRANSACTIONID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, " +
-              "DELIVERYCOUNT INTEGER, RELIABLE CHAR(1), PRIMARY KEY(CHANNELID, MESSAGEID))");
+              "DELIVERYCOUNT INTEGER, RELIABLE CHAR(1), LOADED CHAR(1), PRIMARY KEY(CHANNELID, MESSAGEID))");
       map.put("CREATE_IDX_MESSAGE_REF_TX", "CREATE INDEX JMS_MESSAGE_REF_TX ON JMS_MESSAGE_REFERENCE (TRANSACTIONID)");
       map.put("CREATE_IDX_MESSAGE_REF_ORD", "CREATE INDEX JMS_MESSAGE_REF_ORD ON JMS_MESSAGE_REFERENCE (ORD)");
       map.put("CREATE_IDX_MESSAGE_REF_PAGE_ORD", "CREATE INDEX JMS_MESSAGE_REF__PAGE_ORD ON JMS_MESSAGE_REFERENCE (PAGE_ORD)");
@@ -3663,18 +3525,16 @@
       //Counter
       map.put("CREATE_COUNTER",
               "CREATE TABLE JMS_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))");
-      //Sequences
-      map.put("CREATE_ORDERING_SEQUENCE", "CREATE SEQUENCE REF_ORD");
       return map;
    }
       
    protected Map getDefaultDMLStatements()
    {                
-      Map map = new HashMap();
+      Map map = new LinkedHashMap();
       //Message reference
       map.put("INSERT_MESSAGE_REF",
               "INSERT INTO JMS_MESSAGE_REFERENCE (CHANNELID, MESSAGEID, TRANSACTIONID, STATE, ORD, PAGE_ORD, DELIVERYCOUNT, RELIABLE) " +
-              "VALUES (?, ?, ?, ?, NEXT VALUE FOR REF_ORD, ?, ?, ?)");
+              "VALUES (?, ?, ?, ?, ?, ?, ?, ?)");
       map.put("DELETE_MESSAGE_REF", "DELETE FROM JMS_MESSAGE_REFERENCE WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'");
       map.put("UPDATE_MESSAGE_REF",
               "UPDATE JMS_MESSAGE_REFERENCE SET TRANSACTIONID=?, STATE='-' " +
@@ -3690,9 +3550,7 @@
       map.put("LOAD_UNPAGED_REFS",
               "SELECT MESSAGEID, DELIVERYCOUNT, RELIABLE FROM JMS_MESSAGE_REFERENCE " +
               "WHERE PAGE_ORD IS NULL and CHANNELID = ? ORDER BY ORD");
-      map.put("UPDATE_RELIABLE_REFS_NOT_PAGED", "UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?");   
-      map.put("DELETE_UNRELIABLE_REFS", "DELETE FROM JMS_MESSAGE_REFERENCE WHERE RELIABLE = 'N'");
-      map.put("SHIFT_PAGE_ORDER", "UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = PAGE_ORD + ? WHERE CHANNELID = ?");
+      map.put("UPDATE_RELIABLE_REFS_NOT_PAGED", "UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?");       
       map.put("SELECT_MIN_MAX_PAGE_ORD", "SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?");
       map.put("SELECT_EXISTS_REF", "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?");
       //Message
@@ -3709,10 +3567,6 @@
       map.put("UPDATE_MESSAGE_CHANNELCOUNT", "UPDATE JMS_MESSAGE SET CHANNELCOUNT=? WHERE MESSAGEID=?");
       map.put("DELETE_MESSAGE", "DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?");
       map.put("MESSAGEID_COLUMN", "MESSAGEID");
-      map.put("UPDATE_UNRELIABLE_CHANNELCOUNT",
-              "UPDATE JMS_MESSAGE M SET M.CHANNELCOUNT = M.CHANNELCOUNT - 1 WHERE " +
-              "M.MESSAGEID IN (SELECT MR.MESSAGEID FROM JMS_MESSAGE_REFERENCE MR WHERE MR.RELIABLE = 'N' AND MR.CHANNELID = ?)");
-      map.put("DELETE_UNREFFED_MESSAGES", "DELETE FROM JMS_MESSAGE WHERE CHANNELCOUNT = 0");
       //Transaction
       map.put("INSERT_TRANSACTION",
               "INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) " +
@@ -3729,6 +3583,38 @@
    }
    
    // Private -------------------------------------------------------
+   
+   private short orderCount;
+   
+   private synchronized long getOrdering()
+   {
+      //We generate the ordering for the message reference by taking the lowest 48 bits of the current time and
+      //concetaning with a 15 bit rotating counter to form a string of 63 bits which we then place
+      //in the right most bits of a long, giving a positive signed 63 bit integer.
+      
+      //We only have to guarantee ordering per session, so having slight differences of time on different nodes is
+      //not a problem
+      
+      //This is good for about 8919 years - if you're still running JBoss Messaging then, I suggest you need an
+      //upgrade!
+      
+      long order = System.currentTimeMillis();
+      
+      order = order << 15;
+      
+      order = order | orderCount;
+      
+      if (orderCount == Short.MAX_VALUE)
+      {
+         orderCount = 0;
+      }
+      else
+      {
+         orderCount++;
+      }
+      
+      return order;
+   }
 
    // Inner classes -------------------------------------------------
         

Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCShutdownLogger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCShutdownLogger.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCShutdownLogger.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -24,7 +24,7 @@
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
@@ -252,16 +252,16 @@
    
    protected Map getDefaultDDLStatements()
    {
-      Map sql = new HashMap();
+      Map sql = new LinkedHashMap();
       
-      sql.put("CREATE_STARTUP", "CREATE TABLE JMS_STARTUP (NODE_ID CHAR(1) PRIMARY KEY)");
+      sql.put("CREATE_STARTUP", "CREATE TABLE JMS_STARTUP (NODE_ID VARCHAR(255) PRIMARY KEY)");
       
       return sql;
    }
 
    protected Map getDefaultDMLStatements()
    {
-      Map sql = new HashMap();
+      Map sql = new LinkedHashMap();
       
       sql.put("SELECT_STARTUP", "SELECT NODE_ID FROM JMS_STARTUP WHERE NODE_ID = ?");
       sql.put("DELETE_STARTUP", "DELETE FROM JMS_STARTUP WHERE NODE_ID = ?");

Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -24,8 +24,8 @@
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 
@@ -66,9 +66,9 @@
       
    public JDBCSupport()
    {
-      defaultDMLStatements = new HashMap();
+      defaultDMLStatements = new LinkedHashMap();
       
-      defaultDDLStatements = new HashMap();
+      defaultDDLStatements = new LinkedHashMap();
       
       sqlProperties = new Properties();
    }
@@ -123,7 +123,7 @@
             
             if (sqlProperties.get(statementName) == null)
             {
-               throw new IllegalStateException("SQL statement " + statementName + " is not specified in the persistence manager SQL properties");
+               throw new IllegalStateException("SQL statement " + statementName + " is not specified in the SQL properties");
             }
          }
          
@@ -135,7 +135,7 @@
             
             if (sqlProperties.get(statementName) == null)
             {
-               throw new IllegalStateException("SQL statement " + statementName + " is not specified in the persistence manager SQL properties");
+               throw new IllegalStateException("SQL statement " + statementName + " is not specified in the SQL properties");
             }
          }
       }
@@ -204,16 +204,19 @@
              
             String statement = getSQLStatement(statementName);
             
-            try
-            {
-               if (log.isTraceEnabled()) { log.trace("Executing: " + statement); }
-                   
-               conn.createStatement().executeUpdate(statement);
+            if (!"IGNORE".equals(statement))
+            {               
+               try
+               {
+                  if (log.isTraceEnabled()) { log.trace("Executing: " + statement); }
+                      
+                  conn.createStatement().executeUpdate(statement);
+               }
+               catch (SQLException e) 
+               {
+                  log.debug("Failed to execute: " + statement, e);
+               }  
             }
-            catch (SQLException e) 
-            {
-               log.debug("Failed to execute: " + statement, e);
-            }  
          }      
       }
       finally

Deleted: trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -1,171 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin;
-
-import javax.management.ObjectName;
-import javax.transaction.TransactionManager;
-
-import org.jboss.jms.selector.SelectorFactory;
-import org.jboss.jms.server.QueuedExecutorPool;
-import org.jboss.jms.server.ServerPeer;
-import org.jboss.jms.util.ExceptionUtil;
-import org.jboss.messaging.core.FilterFactory;
-import org.jboss.messaging.core.plugin.contract.MessageStore;
-import org.jboss.messaging.core.plugin.contract.MessagingComponent;
-import org.jboss.messaging.core.plugin.contract.PersistenceManager;
-import org.jboss.messaging.core.plugin.postoffice.DefaultPostOffice;
-import org.jboss.messaging.core.tx.TransactionRepository;
-
-/**
- * A SimplePostOfficeService
- * 
- * MBean wrapper for a simple post office
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class SimplePostOfficeService extends JDBCServiceSupport
-{
-   private DefaultPostOffice postOffice;
-   
-   private ObjectName serverPeerObjectName;
-   
-   private String officeName;
-   
-   private boolean started;
-   
-   // Constructor ----------------------------------------------------------
-   
-   public SimplePostOfficeService()
-   {      
-   }
-   
-   // ServerPlugin implementation ------------------------------------------
-   
-   public MessagingComponent getInstance()
-   {
-      return postOffice;
-   }
-   
-   // MBean attributes -----------------------------------------------------
-   
-   public synchronized ObjectName getServerPeer()
-   {
-      return serverPeerObjectName;
-   }
-   
-   public synchronized void setServerPeer(ObjectName on)
-   {
-      if (started)
-      {
-         log.warn("Cannot set attribute when service is started");
-         return;
-      }
-      this.serverPeerObjectName = on;
-   }
-   
-   public synchronized String getPostOfficeName()
-   {
-      return officeName;
-   }
-   
-   public synchronized void setPostOfficeName(String name)
-   {
-      if (started)
-      {
-         log.warn("Cannot set attribute when service is started");
-         return;
-      }
-      this.officeName = name;
-   }
-   
-   // ServiceMBeanSupport overrides ---------------------------------
-   
-   protected synchronized void startService() throws Exception
-   {
-      if (started)
-      {
-         throw new IllegalStateException("Service is already started");
-      }
-      
-      super.startService();
-      
-      try
-      {  
-         TransactionManager tm = getTransactionManagerReference();
-         
-         ServerPeer serverPeer = (ServerPeer)server.getAttribute(serverPeerObjectName, "Instance");
-         
-         MessageStore ms = serverPeer.getMessageStore();
-         
-         PersistenceManager pm = serverPeer.getPersistenceManagerInstance();
-         
-         QueuedExecutorPool pool = serverPeer.getQueuedExecutorPool();
-         
-         TransactionRepository tr = serverPeer.getTxRepository();
-         
-         String nodeId = serverPeer.getServerPeerID();
-         
-         FilterFactory ff = new SelectorFactory();
-               
-         postOffice = new DefaultPostOffice(ds, tm, sqlProperties,
-                                         createTablesOnStartup,
-                                         nodeId, officeName, ms, pm, tr, ff, pool);
-         
-         postOffice.start();
-         
-         started = true;         
-      }
-      catch (Throwable t)
-      {
-         throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
-      } 
-   }
-   
-   protected void stopService() throws Exception
-   {
-      if (!started)
-      {
-         throw new IllegalStateException("Service is not started");
-      }
-      
-      super.stopService();
-      
-      try
-      {      
-         postOffice.stop();
-         
-         postOffice = null;
-               
-         started = false;
-                     
-         log.debug(this + " stopped");
-      }
-      catch (Throwable t)
-      {
-         throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
-      } 
-   }      
-}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -27,7 +27,6 @@
 import java.sql.Types;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -635,10 +634,10 @@
    
    protected Map getDefaultDMLStatements()
    {                
-      Map map = new HashMap();
+      Map map = new LinkedHashMap();
       map.put("INSERT_BINDING",
-               "INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) " +
-               "VALUES (?, ?, ?, ?, ?, ?)");
+              "INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) " +
+              "VALUES (?, ?, ?, ?, ?, ?)");
       map.put("DELETE_BINDING",
               "DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?");
       map.put("LOAD_BINDINGS",
@@ -649,11 +648,11 @@
    
    protected Map getDefaultDDLStatements()
    {
-      Map map = new HashMap();
+      Map map = new LinkedHashMap();
       map.put("CREATE_POSTOFFICE_TABLE",
-              "CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(256), NODE_ID VARCHAR(256)," +
-              "QUEUE_NAME VARCHAR(1024), CONDITION VARCHAR(1024), " +
-              "SELECTOR VARCHAR(1024), CHANNEL_ID BIGINT)");
+              "CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255)," +
+              "QUEUE_NAME VARCHAR(1023), CONDITION VARCHAR(1023), " +
+              "SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)");
       return map;
    }
    

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -130,8 +130,6 @@
    
    private StatsSender statsSender;
    
-   private long statsSendPeriod;
-   
    private boolean started;
       
    public DefaultClusteredPostOffice()
@@ -230,9 +228,7 @@
       this.routerFactory = rf;
       
       this.pullSize = pullSize;
-      
-      this.statsSendPeriod = statsSendPeriod;
-      
+       
       routerMap = new HashMap();
       
       statsSender = new StatsSender(this, statsSendPeriod);
@@ -442,7 +438,7 @@
             //to send the message to the other office instances on the cluster if there are
             //queues on those nodes that need to receive the message
             
-            //FIXME - there is a bug here, numberRemote does not take into account that more than one
+            //TODO - there is an innefficiency here, numberRemote does not take into account that more than one
             //of the number remote may be on the same node, so we could end up multicasting
             //when unicast would do
             if (numberRemote > 0)
@@ -454,7 +450,7 @@
                   //   log.info("unicast no tx");
                      //Unicast - only one node is interested in the message
                      
-                     //FIXME - temporarily commented out until can get unicast to work
+                     //TODO - temporarily commented out until can get unicast to work                     
                      //asyncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId);
                      asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
                   }
@@ -671,17 +667,51 @@
    /*
     * Unicast a message to one members of the group
     */
-   public void asyncSendRequest(ClusterRequest request, Address address) throws Exception
+   public void asyncSendRequest(ClusterRequest request, String nodeId) throws Exception
    {               
+      Address address = this.getAddressForNodeId(nodeId);
+      
+      if (address == null)
+      {
+         throw new IllegalArgumentException("Cannot find address for node " + nodeId);
+      }
+      
       byte[] bytes = writeRequest(request);
             
       Message m = new Message(address, null, bytes);
       
-      //TODO - handle serialization more efficiently
       asyncChannel.send(m);
    }
    
    /*
+    * Unicast a sync request
+    */
+   public Object syncSendRequest(ClusterRequest request, String nodeId, boolean ignoreNoAddress) throws Exception
+   {              
+      Address address = this.getAddressForNodeId(nodeId);
+      
+      if (address == null)
+      {
+         if (ignoreNoAddress)
+         {
+            return null;
+         }
+         else
+         {
+            throw new IllegalArgumentException("Cannot find address for node " + nodeId);
+         }
+      }
+      
+      byte[] bytes = writeRequest(request);
+            
+      Message message = new Message(address, null, bytes);      
+      
+      Object result = controlMessageDispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
+       
+      return result;
+   }
+   
+   /*
     * We put the transaction in the holding area
     */
    public void holdTransaction(TransactionId id, ClusterTransaction tx) throws Exception
@@ -919,19 +949,7 @@
       }
    }
    
-   /*
-    * Unicast a sync request
-    */
-   public Object syncSendRequest(ClusterRequest request, Address address) throws Exception
-   {              
-      byte[] bytes = writeRequest(request);
-            
-      Message message = new Message(address, null, bytes);      
-      
-      Object result = controlMessageDispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
-       
-      return result;
-   }
+  
                
    // Public ------------------------------------------------------------------------------------------
       
@@ -1142,8 +1160,6 @@
       }
    }
       
-   //TODO - Sort out serialization properly
-   
    private byte[] getStateAsBytes() throws Exception
    {
       List bindings = new ArrayList();

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -155,8 +155,6 @@
    {
       List dels = new ArrayList();
       
-      log.info("getting " + number + " deliveries, there are " + messageRefs.size() + " available");
-      
       synchronized (refLock)
       {
          synchronized (deliveryLock)
@@ -183,7 +181,6 @@
             }
             else
             {
-               log.info("Returning an empty list since receivers are ready");
                return Collections.EMPTY_LIST;
             }
          }
@@ -245,7 +242,6 @@
             //refs but there are none available in the channel (either the channel is empty
             //or there are only refs that don't match any selectors)
             //then we should perhaps pull some messages from a remote queue
-            log.info("pulling messages");
             pullMessages();
          }
       }
@@ -296,15 +292,7 @@
          theQueue = pullQueue;
          thePullSize = pullSize;
       }
-       
-      Address fromAddress = office.getAddressForNodeId(theQueue.getNodeId());
-      
-      if (fromAddress == null)
-      {
-         //This is ok - the node might have left the group
-         return;
-      }
-                  
+                
       Transaction tx = tr.createTransaction();
          
       ClusterRequest req = new PullMessagesRequest(this.nodeId, tx.getId(), theQueue.getChannelID(),
@@ -313,8 +301,14 @@
       log.info(System.identityHashCode(this) + " Executing pull messages request for queue " + name +
                " pulling from node " + theQueue.getNodeId() + " to node " + this.nodeId);
       
-      byte[] bytes = (byte[])office.syncSendRequest(req, fromAddress);
+      byte[] bytes = (byte[])office.syncSendRequest(req, theQueue.getNodeId(), true);
       
+      if (bytes == null)
+      {
+         //Ok - node might have left the group
+         return;
+      }
+      
       log.info( System.identityHashCode(this) +" Executed pull messages request");
       
       PullMessagesResponse response = new PullMessagesResponse();
@@ -371,7 +365,7 @@
       {         
          req = new PullMessagesRequest(this.nodeId, tx.getId());
          
-         office.asyncSendRequest(req, fromAddress);
+         office.asyncSendRequest(req, theQueue.getNodeId());
       }
       
    }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -59,9 +59,9 @@
    
    void asyncSendRequest(ClusterRequest request) throws Exception;
    
-   void asyncSendRequest(ClusterRequest request, Address address) throws Exception;
+   void asyncSendRequest(ClusterRequest request, String nodeId) throws Exception;
    
-   Object syncSendRequest(ClusterRequest request, Address address) throws Exception;
+   Object syncSendRequest(ClusterRequest request, String nodeId, boolean ignoreNoAddress) throws Exception;
    
    void holdTransaction(TransactionId id, ClusterTransaction tx) throws Throwable;
    
@@ -76,6 +76,4 @@
    boolean referenceExistsInStorage(long channelID, long messageID) throws Exception;
    
    List getDeliveries(String queueName, int numMessages) throws Exception; 
-   
-   Address getAddressForNodeId(String nodeId) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -170,9 +170,7 @@
          //We need to ack them in memory only
          //since they would have been acked on the pulling node
          LocalClusteredQueue queue = (LocalClusteredQueue)del.getObserver();
-         
-         log.info("i am committing request");
-         
+              
          queue.acknowledgeFromCluster(del);
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -237,23 +237,19 @@
    }
 
    public void activate()
-   {
-      throw new UnsupportedOperationException();
+   {      
    }
 
    public void deactivate()
-   {
-      throw new UnsupportedOperationException();
+   {      
    }
 
    public void load() throws Exception
-   {
-      throw new UnsupportedOperationException();
+   {      
    }
 
    public void unload() throws Exception
-   {
-      throw new UnsupportedOperationException();
+   {      
    }
    
    public boolean isActive()

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -46,16 +46,32 @@
       }
 
       return
-      "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;bind_addr=" + host + ";up_thread=false;down_thread=false):" +
-      "PING(timeout=2000;num_initial_members=3;up_thread=false;down_thread=false):"+
-      "FD(timeout=3000;up_thread=false;down_thread=false):"+
-      "VERIFY_SUSPECT(timeout=1500;up_thread=false;down_thread=false):"+
-      "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
-      "UNICAST(timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
-      "pbcast.STABLE(desired_avg_gossip=10000;up_thread=false;down_thread=false):"+
-      "FRAG(up_thread=false;down_thread=false):"+
-      "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true;up_thread=false;down_thread=false)";
+//      "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;bind_addr=" + host + ";up_thread=false;down_thread=false):" +
+//      "PING(timeout=2000;num_initial_members=3;up_thread=false;down_thread=false):"+
+//      "FD(timeout=3000;up_thread=false;down_thread=false):"+
+//      "VERIFY_SUSPECT(timeout=1500;up_thread=false;down_thread=false):"+
+//      "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
+//      "UNICAST(timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
+//      "pbcast.STABLE(desired_avg_gossip=10000;up_thread=false;down_thread=false):"+
+//      "FRAG(up_thread=false;down_thread=false):"+
+//      "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true;up_thread=false;down_thread=false)";
 
+      "UDP(mcast_recv_buf_size=500000;down_thread=false;ip_mcast=true;mcast_send_buf_size=32000;"+
+          "mcast_port=45566;ucast_recv_buf_size=500000;use_incoming_packet_handler=false;"+
+          "mcast_addr=228.8.8.8;use_outgoing_packet_handler=true;ucast_send_buf_size=32000;ip_ttl=32;"+
+          "bind_addr=127.0.0.1;loopback=true):"+
+      "AUTOCONF(down_thread=false;up_thread=false):"+
+      "PING(timeout=2000;down_thread=false;num_initial_members=3;up_thread=false):"+
+      "MERGE2(max_interval=10000;down_thread=false;min_interval=5000;up_thread=false):"+
+      "FD(timeout=2000;max_tries=3;down_thread=false;up_thread=false;shun=true):"+
+      "VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):"+
+      "pbcast.NAKACK(max_xmit_size=8192;down_thread=false;use_mcast_xmit=true;gc_lag=50;up_thread=false;"+
+                    "retransmit_timeout=100,200,600,1200,2400,4800):"+
+      "UNICAST(timeout=1200,2400,3600;down_thread=false;up_thread=false):"+
+      "pbcast.STABLE(stability_delay=1000;desired_avg_gossip=20000;down_thread=false;max_bytes=0;up_thread=false):"+
+      "FRAG(frag_size=8192;down_thread=false;up_thread=false):"+
+      "VIEW_SYNC(avg_send_interval=60000;down_thread=false;up_thread=false):"+
+      "pbcast.GMS(print_local_addr=true;join_timeout=3000;down_thread=false;join_retry_timeout=2000;up_thread=false;shun=true)";      
    }
    
 
@@ -73,15 +89,22 @@
       }
 
       return
-         "UDP(mcast_addr=228.8.8.8;mcast_port=45568;ip_ttl=32;bind_addr=" + host + ";up_thread=false;down_thread=false):" +
-         "PING(timeout=2000;num_initial_members=3;up_thread=false;down_thread=false):"+
-         "FD(timeout=3000;up_thread=false;down_thread=false):"+
-         "VERIFY_SUSPECT(timeout=1500;up_thread=false;down_thread=false):"+
-         "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
-         "pbcast.STABLE(desired_avg_gossip=10000;up_thread=false;down_thread=false):"+
-         "FRAG(up_thread=false;down_thread=false):"+
-         "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true;up_thread=false;down_thread=false):" +
-         "pbcast.STATE_TRANSFER(up_thread=false;down_thread=false)";     
+      "UDP(mcast_recv_buf_size=500000;down_thread=false;ip_mcast=true;mcast_send_buf_size=32000;"+
+      "mcast_port=45568;ucast_recv_buf_size=500000;use_incoming_packet_handler=false;"+
+      "mcast_addr=228.8.8.8;use_outgoing_packet_handler=true;loopback=true;ucast_send_buf_size=32000;ip_ttl=32;bind_addr=127.0.0.1):"+
+     "AUTOCONF(down_thread=false;up_thread=false):"+
+     "PING(timeout=2000;down_thread=false;num_initial_members=3;up_thread=false):"+
+     "MERGE2(max_interval=10000;down_thread=false;min_interval=5000;up_thread=false):"+
+     "FD(timeout=2000;max_tries=3;down_thread=false;up_thread=false;shun=true):"+
+     "VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):"+
+     "pbcast.NAKACK(max_xmit_size=8192;down_thread=false;use_mcast_xmit=true;gc_lag=50;up_thread=false;"+
+                   "retransmit_timeout=100,200,600,1200,2400,4800):"+
+     "UNICAST(timeout=1200,2400,3600;down_thread=false;up_thread=false):"+
+     "pbcast.STABLE(stability_delay=1000;desired_avg_gossip=20000;down_thread=false;max_bytes=0;up_thread=false):"+
+     "FRAG(frag_size=8192;down_thread=false;up_thread=false):"+
+     "VIEW_SYNC(avg_send_interval=60000;down_thread=false;up_thread=false):"+
+     "pbcast.GMS(print_local_addr=true;join_timeout=3000;down_thread=false;join_retry_timeout=2000;up_thread=false;shun=true):"+
+     "pbcast.STATE_TRANSFER(down_thread=false;up_thread=false)";   
    }
 
    // Attributes ----------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-09-26 14:49:27 UTC (rev 1363)
@@ -226,9 +226,7 @@
          log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());         
          log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());         
          log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-         
-         
-         
+                           
          log.info("trying to consume");
          
          //So we have 150 messages in total - 30 on each node.




More information about the jboss-cvs-commits mailing list