[jboss-cvs] JBoss Messaging SVN: r1363 - in trunk: docs/examples/secure-socket/etc src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/jms/server/plugin src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Sep 26 10:49:48 EDT 2006
Author: timfox
Date: 2006-09-26 10:49:27 -0400 (Tue, 26 Sep 2006)
New Revision: 1363
Added:
trunk/src/etc/xmdesc/DefaultPostOffice-xmbean.xml
trunk/src/main/org/jboss/messaging/core/plugin/DefaultPostOfficeService.java
Removed:
trunk/src/etc/xmdesc/SimplePostOffice-xmbean.xml
trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java
Modified:
trunk/docs/examples/secure-socket/etc/messaging-secure-socket-service.xml
trunk/src/etc/server/default/deploy/destinations-service.xml
trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
trunk/src/etc/server/default/deploy/messaging-service.xml
trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
trunk/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManager.java
trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/plugin/JDBCShutdownLogger.java
trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Log:
More clustering tweaks
Modified: trunk/docs/examples/secure-socket/etc/messaging-secure-socket-service.xml
===================================================================
--- trunk/docs/examples/secure-socket/etc/messaging-secure-socket-service.xml 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/docs/examples/secure-socket/etc/messaging-secure-socket-service.xml 2006-09-26 14:49:27 UTC (rev 1363)
@@ -21,7 +21,7 @@
<invoker transport="sslsocket">
<attribute name="marshaller" isParam="true">org.jboss.jms.server.remoting.JMSWireFormat</attribute>
<attribute name="unmarshaller" isParam="true">org.jboss.jms.server.remoting.JMSWireFormat</attribute>
- <attribute name="serializationtype" isParam="true">jboss</attribute>
+ <attribute name="serializationtype" isParam="true">jms</attribute>
<attribute name="dataType" isParam="true">jms</attribute>
<attribute name="socket.check_connection" isParam="true">false</attribute>
<attribute name="timeout">0</attribute>
Modified: trunk/src/etc/server/default/deploy/destinations-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/destinations-service.xml 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/etc/server/default/deploy/destinations-service.xml 2006-09-26 14:49:27 UTC (rev 1363)
@@ -16,6 +16,7 @@
name="jboss.messaging.destination:service=Queue,name=DLQ"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=QueuePostOffice</depends>
</mbean>
<!--
@@ -26,6 +27,7 @@
name="jboss.messaging.destination:service=Topic,name=testTopic"
xmbean-dd="xmdesc/Topic-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=TopicPostOffice</depends>
<attribute name="SecurityConfig">
<security>
<role name="guest" read="true" write="true"/>
@@ -39,6 +41,7 @@
name="jboss.messaging.destination:service=Topic,name=securedTopic"
xmbean-dd="xmdesc/Topic-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=TopicPostOffice</depends>
<attribute name="SecurityConfig">
<security>
<role name="publisher" read="true" write="true" create="false"/>
@@ -50,6 +53,7 @@
name="jboss.messaging.destination:service=Topic,name=testDurableTopic"
xmbean-dd="xmdesc/Topic-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=TopicPostOffice</depends>
<attribute name="SecurityConfig">
<security>
<role name="guest" read="true" write="true"/>
@@ -63,6 +67,7 @@
name="jboss.messaging.destination:service=Queue,name=testQueue"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=QueuePostOffice</depends>
<attribute name="SecurityConfig">
<security>
<role name="guest" read="true" write="true"/>
@@ -76,36 +81,42 @@
name="jboss.messaging.destination:service=Queue,name=A"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=QueuePostOffice</depends>
</mbean>
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.messaging.destination:service=Queue,name=B"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=QueuePostOffice</depends>
</mbean>
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.messaging.destination:service=Queue,name=C"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=QueuePostOffice</depends>
</mbean>
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.messaging.destination:service=Queue,name=D"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=QueuePostOffice</depends>
</mbean>
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.messaging.destination:service=Queue,name=ex"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=QueuePostOffice</depends>
</mbean>
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.messaging.destination:service=Queue,name=openTopic"
xmbean-dd="xmdesc/Topic-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=QueuePostOffice</depends>
<attribute name="SecurityConfig">
<security>
<role name="guest" read="true" write="true" create="true"/>
@@ -119,6 +130,7 @@
name="jboss.messaging.destination:service=Queue,name=ClusteredQueue1"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=QueuePostOffice</depends>
<attribute name="Clustered">true</attribute>
</mbean>
@@ -126,6 +138,7 @@
name="jboss.messaging.destination:service=Topic,name=ClusteredTopic1"
xmbean-dd="xmdesc/Topic-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=TopicPostOffice</depends>
<attribute name="Clustered">true</attribute>
</mbean>
Modified: trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml 2006-09-26 14:49:27 UTC (rev 1363)
@@ -21,7 +21,7 @@
<attribute name="UsingBatchUpdates">true</attribute>
</mbean>
- <mbean code="org.jboss.messaging.core.plugin.SimplePostOfficeService"
+ <mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
name="jboss.messaging:service=QueuePostOffice"
xmbean-dd="xmdesc/SimplePostOffice-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
@@ -32,7 +32,7 @@
<attribute name="CreateTablesOnStartup">true</attribute>
</mbean>
- <mbean code="org.jboss.messaging.core.plugin.SimplePostOfficeService"
+ <mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
name="jboss.messaging:service=TopicPostOffice"
xmbean-dd="xmdesc/SimplePostOffice-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
@@ -45,7 +45,7 @@
<!--
- Uncomment this and comment out the one above to enable clustered topics
+ Uncomment this to enable clustered destinations
<mbean code="org.jboss.messaging.core.plugin.ClusteredTopicPostOfficeService"
name="jboss.messaging:service=TopicPostOffice"
@@ -55,64 +55,48 @@
<depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
<attribute name="PostOfficeName">Clustered Topic</attribute>
<attribute name="DataSource">java:/DefaultDS</attribute>
- <attribute name="CreateTablesOnStartup">true</attribute>
+ <attribute name="CreateTablesOnStartup">true</attribute>
+ <attribute name="SqlProperties"><![CDATA[
<attribute name="GroupName">cluster1</attribute>
<attribute name="StateTimeout">5000</attribute>
<attribute name="CastTimeout">5000</attribute>
<attribute name="PullSize">1</attribute>
<attribute name="StatsSendPeriod">1000</attribute>
<attribute name="SyncChannelConfig">
- <UDP mcast_addr="228.8.8.8" mcast_port="45568"
- ip_ttl="8" ip_mcast="true"
- mcast_send_buf_size="800000" mcast_recv_buf_size="150000"
- ucast_send_buf_size="800000" ucast_recv_buf_size="150000"
- loopback="false"/>
- <PING timeout="2000" num_initial_members="3"
- up_thread="true" down_thread="true"/>
- <MERGE2 min_interval="10000" max_interval="20000"/>
- <FD shun="true" up_thread="true" down_thread="true"
- timeout="2500" max_tries="5"/>
- <VERIFY_SUSPECT timeout="3000" num_msgs="3"
- up_thread="true" down_thread="true"/>
- <pbcast.NAKACK gc_lag="50" retransmit_timeout="300,600,1200,2400,4800"
- max_xmit_size="8192"
- up_thread="true" down_thread="true"/>
- <pbcast.STABLE desired_avg_gossip="20000"
- up_thread="true" down_thread="true"/>
- <FRAG frag_size="8192"
- down_thread="true" up_thread="true"/>
- <pbcast.GMS join_timeout="5000" join_retry_timeout="2000"
- shun="true" print_local_addr="true"/>
- <pbcast.STATE_TRANSFER up_thread="true" down_thread="true"/>
+ <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
+ mcast_port="45566" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
+ mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="false" ucast_send_buf_size="32000" ip_ttl="32"/>
+ <AUTOCONF down_thread="false" up_thread="false"/>
+ <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
+ <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+ <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+ <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
+ retransmit_timeout="100,200,600,1200,2400,4800"/>
+ <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
+ <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
+ <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
+ <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
</attribute>
- TODO
- Do we need failure detection and group management in this stack ??
-
<attribute name="AsyncChannelConfig">
- <UDP mcast_addr="${jboss.partition.udpGroup:228.1.2.3}" mcast_port="45566"
- ip_ttl="8" ip_mcast="true"
- mcast_send_buf_size="800000" mcast_recv_buf_size="150000"
- ucast_send_buf_size="800000" ucast_recv_buf_size="150000"
- loopback="false"/>
- <PING timeout="2000" num_initial_members="3"
- up_thread="true" down_thread="true"/>
- <MERGE2 min_interval="10000" max_interval="20000"/>
- <FD shun="true" up_thread="true" down_thread="true"
- timeout="2500" max_tries="5"/>
- <VERIFY_SUSPECT timeout="3000" num_msgs="3"
- up_thread="true" down_thread="true"/>
- <pbcast.NAKACK gc_lag="50" retransmit_timeout="300,600,1200,2400,4800"
- max_xmit_size="8192"
- up_thread="true" down_thread="true"/>
- <UNICAST timeout="300,600,1200,2400,4800" window_size="100" min_threshold="10"
- down_thread="true"/>
- <pbcast.STABLE desired_avg_gossip="20000"
- up_thread="true" down_thread="true"/>
- <FRAG frag_size="8192"
- down_thread="true" up_thread="true"/>
- <pbcast.GMS join_timeout="5000" join_retry_timeout="2000"
- shun="true" print_local_addr="true"/>
+ <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
+ mcast_port="45568" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
+ mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="false" ucast_send_buf_size="32000" ip_ttl="32"/>
+ <AUTOCONF down_thread="false" up_thread="false"/>
+ <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
+ <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+ <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+ <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
+ retransmit_timeout="100,200,600,1200,2400,4800"/>
+ <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
+ <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
+ <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
+ <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
+ <pbcast.STATE_TRANSFER down_thread="false" up_thread="false"/>
</attribute>
</mbean>
Modified: trunk/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/messaging-service.xml 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/etc/server/default/deploy/messaging-service.xml 2006-09-26 14:49:27 UTC (rev 1363)
@@ -21,13 +21,13 @@
<arg type="java.lang.String" value="/topic" />
</constructor>
- <depends optional-attribute-name="PersistenceManager">jboss.messaging:service=PersistenceManager</depends>
- <depends optional-attribute-name="QueuePostOffice">jboss.messaging:service=QueuePostOffice</depends>
- <depends optional-attribute-name="TopicPostOffice">jboss.messaging:service=TopicPostOffice</depends>
+ <depends optional-attribute-name="PersistenceManager">jboss.messaging:service=PersistenceManager</depends>
<depends optional-attribute-name="JMSUserManager">jboss.messaging:service=JMSUserManager</depends>
<depends optional-attribute-name="ShutdownLogger">jboss.messaging:service=ShutdownLogger</depends>
- <!-- Set to -1 to completely disable client leasing -->
+ <attribute name="QueuePostOffice">jboss.messaging:service=QueuePostOffice</attribute>
+ <attribute name="TopicPostOffice">jboss.messaging:service=TopicPostOffice</attribute>
+
<attribute name="SecurityDomain">java:/jaas/messaging</attribute>
<attribute name="DefaultSecurityConfig">
<security>
Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2006-09-26 14:49:27 UTC (rev 1363)
@@ -1,14 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
- MySQl persistence deployment descriptor.
+ MySql persistence deployment descriptor.
$Id$
-->
<server>
- <mbean code="org.jboss.messaging.core.plugin.JDBCPersistenceManager"
+ <mbean code="org.jboss.messaging.core.plugin.JDBCPersistenceManagerService"
name="jboss.messaging:service=PersistenceManager"
xmbean-dd="xmdesc/JDBCPersistenceManager-xmbean.xml">
<depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
@@ -17,111 +17,235 @@
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="UsingBatchUpdates">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_MESSAGE_REF=CREATE TABLE JMS_MESSAGE_REFERENCE (CHANNELID BIGINT, MESSAGEID BIGINT, TRANSACTIONID BIGINT, STATE CHAR(1), ORD BIGINT, DELIVERYCOUNT INTEGER, RELIABLE CHAR(1), LOADED CHAR(1), PRIMARY KEY(CHANNELID, MESSAGEID))
+CREATE_MESSAGE_REFERENCE=CREATE TABLE JMS_MESSAGE_REFERENCE (CHANNELID BIGINT, MESSAGEID BIGINT, TRANSACTIONID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, DELIVERYCOUNT INTEGER, RELIABLE CHAR(1), LOADED CHAR(1), PRIMARY KEY(CHANNELID, MESSAGEID))
CREATE_IDX_MESSAGE_REF_TX=CREATE INDEX JMS_MESSAGE_REF_TX ON JMS_MESSAGE_REFERENCE (TRANSACTIONID)
CREATE_IDX_MESSAGE_REF_ORD=CREATE INDEX JMS_MESSAGE_REF_ORD ON JMS_MESSAGE_REFERENCE (ORD)
+CREATE_IDX_MESSAGE_REF_PAGE_ORD=CREATE INDEX JMS_MESSAGE_REF_LOADED ON JMS_MESSAGE_REFERENCE (PAGE_ORD)
CREATE_IDX_MESSAGE_REF_MESSAGEID=CREATE INDEX JMS_MESSAGE_REF_MESSAGEID ON JMS_MESSAGE_REFERENCE (MESSAGEID)
-CREATE_IDX_MESSAGE_REF_LOADED=CREATE INDEX JMS_MESSAGE_REF_LOADED ON JMS_MESSAGE_REFERENCE (LOADED)
CREATE_IDX_MESSAGE_REF_RELIABLE=CREATE INDEX JMS_MESSAGE_REF_RELIABLE ON JMS_MESSAGE_REFERENCE (RELIABLE)
-INSERT_MESSAGE_REF=INSERT INTO JMS_MESSAGE_REFERENCE (CHANNELID, MESSAGEID, TRANSACTIONID, STATE, ORD, DELIVERYCOUNT, RELIABLE, LOADED) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+CREATE_MESSAGE=CREATE TABLE JMS_MESSAGE (MESSAGEID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, COREHEADERS MEDIUMBLOB, PAYLOAD LONGBLOB, CHANNELCOUNT INTEGER, TYPE TINYINT, JMSTYPE VARCHAR(255), CORRELATIONID VARCHAR(255), CORRELATIONID_BYTES VARBINARY(254), DESTINATION VARCHAR(255), REPLYTO VARCHAR(255), JMSPROPERTIES MEDIUMBLOB, PRIMARY KEY (MESSAGEID))
+CREATE_TRANSACTION=CREATE TABLE JMS_TRANSACTION (TRANSACTIONID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTIONID))
+CREATE_COUNTER=CREATE TABLE JMS_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))
+INSERT_MESSAGE_REF=INSERT INTO JMS_MESSAGE_REFERENCE (CHANNELID, MESSAGEID, TRANSACTIONID, STATE, ORD, PAGE_ORD, DELIVERYCOUNT, RELIABLE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
DELETE_MESSAGE_REF=DELETE FROM JMS_MESSAGE_REFERENCE WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'
UPDATE_MESSAGE_REF=UPDATE JMS_MESSAGE_REFERENCE SET TRANSACTIONID=?, STATE='-' WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'
-UPDATE_MESSAGE_REF_NOT_LOADED=UPDATE JMS_MESSAGE_REFERENCE SET LOADED='N' WHERE MESSAGEID=? AND CHANNELID=?
+UPDATE_PAGE_ORDER=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = ? WHERE MESSAGEID=? AND CHANNELID=?
COMMIT_MESSAGE_REF1=UPDATE JMS_MESSAGE_REFERENCE SET STATE='C', TRANSACTIONID = NULL WHERE TRANSACTIONID=? AND STATE='+'
COMMIT_MESSAGE_REF2=DELETE FROM JMS_MESSAGE_REFERENCE WHERE TRANSACTIONID=? AND STATE='-'
ROLLBACK_MESSAGE_REF1=DELETE FROM JMS_MESSAGE_REFERENCE WHERE TRANSACTIONID=? AND STATE='+'
ROLLBACK_MESSAGE_REF2=UPDATE JMS_MESSAGE_REFERENCE SET STATE='C', TRANSACTIONID = NULL WHERE TRANSACTIONID=? AND STATE='-'
-LOAD_REF_INFO=SELECT MESSAGEID, ORD, DELIVERYCOUNT, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID=? AND STATE <> '+' AND LOADED = 'N' AND ORD BETWEEN ? AND ? ORDER BY ORD
-SELECT_COUNT_REFS=SELECT COUNT(MESSAGEID) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID=? AND STATE <> '+' AND LOADED='N'
-UPDATE_RELIABLE_REFS=UPDATE JMS_MESSAGE_REFERENCE SET LOADED='Y' WHERE ORD BETWEEN ? AND ? AND CHANNELID=? AND RELIABLE='Y' AND STATE <> '+'
-UPDATE_RELIABLE_REFS_NOT_LOADED=UPDATE JMS_MESSAGE_REFERENCE SET LOADED='N' WHERE CHANNELID=?
-SELECT_MIN_ORDERING=SELECT MIN(ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID=? AND STATE <> '+' AND LOADED = 'N'
-DELETE_UNRELIABLE_REFS=DELETE FROM JMS_MESSAGE_REFERENCE WHERE RELIABLE = 'N'
-CREATE_MESSAGE=CREATE TABLE JMS_MESSAGE (MESSAGEID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, COREHEADERS MEDIUMBLOB, PAYLOAD LONGBLOB, CHANNELCOUNT INTEGER, TYPE TINYINT, JMSTYPE VARCHAR(255), CORRELATIONID VARCHAR(255), CORRELATIONID_BYTES VARBINARY(254), DESTINATION_ID BIGINT, REPLYTO_ID BIGINT, JMSPROPERTIES MEDIUMBLOB, PRIMARY KEY (MESSAGEID))
-LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES FROM JMS_MESSAGE
-INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+LOAD_PAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, PAGE_ORD, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
+LOAD_UNPAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE PAGE_ORD IS NULL and CHANNELID = ? ORDER BY ORD
+UPDATE_RELIABLE_REFS_NOT_PAGED=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?
+SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?
+SELECT_EXISTS_REF=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?");
+LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES FROM JMS_MESSAGE
+INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
UPDATE_MESSAGE_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT=? WHERE MESSAGEID=?
DELETE_MESSAGE=DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?
-MESSAGEID_COLUMN=MESSAGEID
-UPDATE_UNRELIABLE_CHANNELCOUNT=UPDATE JMS_MESSAGE M SET M.CHANNELCOUNT = M.CHANNELCOUNT - 1 WHERE M.MESSAGEID IN (SELECT MR.MESSAGEID FROM JMS_MESSAGE_REFERENCE MR WHERE MR.RELIABLE = 'N' AND MR.CHANNELID = ?)
-DELETE_UNREFFED_MESSAGES=DELETE FROM JMS_MESSAGE WHERE CHANNELCOUNT = 0
-CREATE_TRANSACTION=CREATE TABLE JMS_TRANSACTION (TRANSACTIONID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTIONID))
+MESSAGEID_COLUMN=MESSAGEID
INSERT_TRANSACTION=INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JMS_TRANSACTION WHERE TRANSACTIONID = ?
-SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JMS_TRANSACTION
-DELETE_ALL_TRANSACTIONS=DELETE FROM JMS_TRANSACTION
-CREATE_COUNTER=CREATE TABLE JMS_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))
+SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JMS_TRANSACTION
UPDATE_COUNTER=UPDATE JMS_COUNTER SET NEXT_ID = ? WHERE NAME=?
-SELECT_COUNTER=SELECT NEXT_ID FROM JMS_COUNTER WHERE NAME=?
+SELECT_COUNTER=SELECT NEXT_ID FROM JMS_COUNTER WHERE NAME=? FOR UPDATE
INSERT_COUNTER=INSERT INTO JMS_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
-DELETE_ALL_COUNTERS=DELETE FROM JMS_COUNTER
SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNELID) FROM JMS_MESSAGE_REFERENCE
]]></attribute>
<attribute name="MaxParams">500</attribute>
</mbean>
- <mbean code="org.jboss.messaging.core.plugin.DirectExchange"
- name="jboss.messaging:service=DirectExchange"
- xmbean-dd="xmdesc/Exchange-xmbean.xml">
+ <!--
+
+ <mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
+ name="jboss.messaging:service=QueuePostOffice"
+ xmbean-dd="xmdesc/SimplePostOffice-xmbean.xml">
+ <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
<depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+ <attribute name="PostOfficeName">Queue</attribute>
<attribute name="DataSource">java:/DefaultDS</attribute>
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_USER_TABLE=CREATE TABLE JMS_USER (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128), PRIMARY KEY(USERID))
-CREATE_ROLE_TABLE=CREATE TABLE JMS_ROLE (ROLEID VARCHAR(32) NOT NULL, USERID VARCHAR(32) NOT NULL, PRIMARY KEY(USERID, ROLEID))
-SELECT_PRECONF_CLIENTID=SELECT CLIENTID FROM JMS_USER WHERE USERID=?
-CREATE_MAPPING_TABLE=CREATE TABLE JMS_CHANNEL_MAPPING (ID BIGINT, TYPE CHAR(1), JMS_DEST_NAME VARCHAR(1024), JMS_SUB_NAME VARCHAR(1024), CLIENT_ID VARCHAR(128), SELECTOR VARCHAR(1024), NO_LOCAL CHAR(1), PRIMARY KEY(ID))
-INSERT_MAPPING=INSERT INTO JMS_CHANNEL_MAPPING (ID, TYPE, JMS_DEST_NAME, JMS_SUB_NAME, CLIENT_ID, SELECTOR, NO_LOCAL) VALUES (?, ?, ?, ?, ?, ?, ?)
-DELETE_MAPPING=DELETE FROM JMS_CHANNEL_MAPPING WHERE ID = ?
-SELECT_ID_FOR_DESTINATION=SELECT ID FROM JMS_CHANNEL_MAPPING WHERE TYPE=? AND JMS_DEST_NAME=?
-SELECT_DURABLE_SUB=SELECT JMS_DEST_NAME, ID, SELECTOR, NO_LOCAL FROM JMS_CHANNEL_MAPPING WHERE CLIENT_ID=? AND JMS_SUB_NAME=?
-SELECT_SUBSCRIPTIONS_FOR_TOPIC=SELECT ID, CLIENT_ID, JMS_SUB_NAME, SELECTOR, NO_LOCAL FROM JMS_CHANNEL_MAPPING WHERE TYPE='D' AND JMS_DEST_NAME=?
- ]]></attribute>
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), CONDITION VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME = ?
+ ]]></attribute>
</mbean>
- <mbean code="org.jboss.messaging.core.plugin.TopicExchange"
- name="jboss.messaging:service=TopicExchange"
- xmbean-dd="xmdesc/Exchange-xmbean.xml">
+ <mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
+ name="jboss.messaging:service=TopicPostOffice"
+ xmbean-dd="xmdesc/SimplePostOffice-xmbean.xml">
+ <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
<depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+ <attribute name="PostOfficeName">Topic</attribute>
<attribute name="DataSource">java:/DefaultDS</attribute>
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_USER_TABLE=CREATE TABLE JMS_USER (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128), PRIMARY KEY(USERID))
-CREATE_ROLE_TABLE=CREATE TABLE JMS_ROLE (ROLEID VARCHAR(32) NOT NULL, USERID VARCHAR(32) NOT NULL, PRIMARY KEY(USERID, ROLEID))
-SELECT_PRECONF_CLIENTID=SELECT CLIENTID FROM JMS_USER WHERE USERID=?
-CREATE_MAPPING_TABLE=CREATE TABLE JMS_CHANNEL_MAPPING (ID BIGINT, TYPE CHAR(1), JMS_DEST_NAME VARCHAR(1024), JMS_SUB_NAME VARCHAR(1024), CLIENT_ID VARCHAR(128), SELECTOR VARCHAR(1024), NO_LOCAL CHAR(1), PRIMARY KEY(ID))
-INSERT_MAPPING=INSERT INTO JMS_CHANNEL_MAPPING (ID, TYPE, JMS_DEST_NAME, JMS_SUB_NAME, CLIENT_ID, SELECTOR, NO_LOCAL) VALUES (?, ?, ?, ?, ?, ?, ?)
-DELETE_MAPPING=DELETE FROM JMS_CHANNEL_MAPPING WHERE ID = ?
-SELECT_ID_FOR_DESTINATION=SELECT ID FROM JMS_CHANNEL_MAPPING WHERE TYPE=? AND JMS_DEST_NAME=?
-SELECT_DURABLE_SUB=SELECT JMS_DEST_NAME, ID, SELECTOR, NO_LOCAL FROM JMS_CHANNEL_MAPPING WHERE CLIENT_ID=? AND JMS_SUB_NAME=?
-SELECT_SUBSCRIPTIONS_FOR_TOPIC=SELECT ID, CLIENT_ID, JMS_SUB_NAME, SELECTOR, NO_LOCAL FROM JMS_CHANNEL_MAPPING WHERE TYPE='D' AND JMS_DEST_NAME=?
- ]]></attribute>
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), CONDITION VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME = ?
+ ]]></attribute>
+ </mbean>
+
+ -->
+
+ <mbean code="org.jboss.messaging.core.plugin.ClusteredPostOfficeService"
+ name="jboss.messaging:service=QueuePostOffice"
+ xmbean-dd="xmdesc/ClusteredPostOffice-xmbean.xml">
+ <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
+ <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+ <attribute name="PostOfficeName">Clustered Queue</attribute>
+ <attribute name="DataSource">java:/DefaultDS</attribute>
+ <attribute name="CreateTablesOnStartup">true</attribute>
+ <attribute name="SqlProperties"><![CDATA[
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME = ?
+ ]]></attribute>
+ <attribute name="GroupName">cluster1</attribute>
+ <attribute name="StateTimeout">5000</attribute>
+ <attribute name="CastTimeout">5000</attribute>
+ <attribute name="PullSize">1</attribute>
+ <attribute name="StatsSendPeriod">1000</attribute>
+
+ <attribute name="AsyncChannelConfig">
+ <config>
+ <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
+ mcast_port="45568" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
+ mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="false" ucast_send_buf_size="32000" ip_ttl="32"/>
+ <AUTOCONF down_thread="false" up_thread="false"/>
+ <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
+ <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+ <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+ <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
+ retransmit_timeout="100,200,600,1200,2400,4800"/>
+ <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
+ <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
+ <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
+ <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
+ </config>
+ </attribute>
+
+ <attribute name="SyncChannelConfig">
+ <config>
+ <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
+ mcast_port="45567" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
+ mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="false" ucast_send_buf_size="32000" ip_ttl="32"/>
+ <AUTOCONF down_thread="false" up_thread="false"/>
+ <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
+ <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+ <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+ <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
+ retransmit_timeout="100,200,600,1200,2400,4800"/>
+ <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
+ <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
+ <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
+ <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
+ <pbcast.STATE_TRANSFER down_thread="false" up_thread="false"/>
+ </config>
+ </attribute>
</mbean>
- <mbean code="org.jboss.jms.server.plugin.JDBCJMSUserManager"
+ <mbean code="org.jboss.messaging.core.plugin.ClusteredPostOfficeService"
+ name="jboss.messaging:service=TopicPostOffice"
+ xmbean-dd="xmdesc/ClusteredPostOffice-xmbean.xml">
+ <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
+ <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+ <attribute name="PostOfficeName">Clustered Topic</attribute>
+ <attribute name="DataSource">java:/DefaultDS</attribute>
+ <attribute name="CreateTablesOnStartup">true</attribute>
+ <attribute name="SqlProperties"><![CDATA[
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME = ?
+ ]]></attribute>
+ <attribute name="GroupName">cluster1</attribute>
+ <attribute name="StateTimeout">5000</attribute>
+ <attribute name="CastTimeout">5000</attribute>
+ <attribute name="PullSize">1</attribute>
+ <attribute name="StatsSendPeriod">1000</attribute>
+
+ <attribute name="AsyncChannelConfig">
+ <config>
+ <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
+ mcast_port="45568" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
+ mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="false" ucast_send_buf_size="32000" ip_ttl="32"/>
+ <AUTOCONF down_thread="false" up_thread="false"/>
+ <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
+ <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+ <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+ <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
+ retransmit_timeout="100,200,600,1200,2400,4800"/>
+ <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
+ <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
+ <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
+ <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
+ </config>
+ </attribute>
+
+ <attribute name="SyncChannelConfig">
+ <config>
+ <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
+ mcast_port="45569" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
+ mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="false" ucast_send_buf_size="32000" ip_ttl="32"/>
+ <AUTOCONF down_thread="false" up_thread="false"/>
+ <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
+ <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+ <FD timeout="2000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+ <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
+ <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
+ retransmit_timeout="100,200,600,1200,2400,4800"/>
+ <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
+ <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
+ <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
+ <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
+ <pbcast.STATE_TRANSFER down_thread="false" up_thread="false"/>
+ </config>
+ </attribute>
+ </mbean>
+
+ <mbean code="org.jboss.jms.server.plugin.JDBCJMSUserManagerService"
name="jboss.messaging:service=JMSUserManager"
xmbean-dd="xmdesc/JMSUserManager-xmbean.xml">
- <!-- TODO this insures the fact that dependency exists. However I need to redundantly specifiy
- the DataSource JNDI name in order to actually get a reference to it. Fix this.
- -->
<depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
<depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
<attribute name="DataSource">java:/DefaultDS</attribute>
- <attribute name="CreateTablesOnStartup">true</attribute>
- </mbean>
+ <attribute name="CreateTablesOnStartup">true</attribute>
+ <attribute name="SqlProperties"><![CDATA[
+CREATE_USER_TABLE=CREATE TABLE JMS_USER (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128), PRIMARY KEY(USERID))
+CREATE_ROLE_TABLE=CREATE TABLE JMS_ROLE (ROLEID VARCHAR(32) NOT NULL, USERID VARCHAR(32) NOT NULL, PRIMARY KEY(USERID, ROLEID))
+SELECT_PRECONF_CLIENTID=SELECT CLIENTID FROM JMS_USER WHERE USERID=?
+ ]]></attribute>
+ </mbean>
- <mbean code="org.jboss.jms.server.plugin.JDBCShutdownLogger"
+ <mbean code="org.jboss.messaging.core.plugin.JDBCShutdownLoggerService"
name="jboss.messaging:service=ShutdownLogger"
xmbean-dd="xmdesc/JDBCShutdownLogger-xmbean.xml">
- <!-- TODO this insures the fact that dependency exists. However I need to redundantly specifiy
- the DataSource JNDI name in order to actually get a reference to it. Fix this.
- -->
<depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
<depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
<attribute name="DataSource">java:/DefaultDS</attribute>
<attribute name="CreateTablesOnStartup">true</attribute>
- </mbean>
+ <attribute name="SqlProperties"><![CDATA[
+ CREATE_STARTUP=CREATE TABLE JMS_STARTUP (NODE_ID VARCHAR(255) PRIMARY KEY)
+ SELECT_STARTUP=SELECT NODE_ID FROM JMS_STARTUP WHERE NODE_ID = ?
+ DELETE_STARTUP=DELETE FROM JMS_STARTUP WHERE NODE_ID = ?
+ INSERT_STARTUP=INSERT INTO JMS_STARTUP (NODE_ID) VALUES (?)
+ ]]></attribute>
+ </mbean>
</server>
\ No newline at end of file
Modified: trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml 2006-09-26 14:49:27 UTC (rev 1363)
@@ -41,6 +41,18 @@
<type>boolean</type>
</attribute>
+ <attribute access="read-write" getMethod="getPostOfficeName" setMethod="setPostOfficeName">
+ <description>The name of the post office</description>
+ <name>PostOfficeName</name>
+ <type>java.lang.String</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getServerPeer" setMethod="setServerPeer">
+ <description>The ObjectName of the server peer this destination was deployed on</description>
+ <name>ServerPeer</name>
+ <type>javax.management.ObjectName</type>
+ </attribute>
+
<attribute access="read-write" getMethod="getGroupName" setMethod="setGroupName">
<description>The name of the JGroups group to use</description>
<name>GroupName</name>
@@ -55,19 +67,19 @@
<attribute access="read-write" getMethod="getCastTimeout" setMethod="setCastTimeout">
<description>Timeout for getState()</description>
- <name>CastTimeout/name>
+ <name>CastTimeout</name>
<type>long</type>
</attribute>
<attribute access="read-write" getMethod="getPullSize" setMethod="setPullSize">
<description>The maximum number of message to pull in one go from a remote queue when the local queue consumers are starving</description>
- <name>SyncPullSize</name>
+ <name>PullSize</name>
<type>int</type>
</attribute>
<attribute access="read-write" getMethod="getStatsSendPeriod" setMethod="setStatsSendPeriod">
<description>The period in milliseconds between a post office casting it's statistics across the cluster</description>
- <name>StatsSendPeriod/name>
+ <name>StatsSendPeriod</name>
<type>long</type>
</attribute>
Copied: trunk/src/etc/xmdesc/DefaultPostOffice-xmbean.xml (from rev 1298, trunk/src/etc/xmdesc/SimplePostOffice-xmbean.xml)
===================================================================
--- trunk/src/etc/xmdesc/SimplePostOffice-xmbean.xml 2006-09-17 17:58:08 UTC (rev 1298)
+++ trunk/src/etc/xmdesc/DefaultPostOffice-xmbean.xml 2006-09-26 14:49:27 UTC (rev 1363)
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+ <!DOCTYPE mbean PUBLIC
+ "-//JBoss//DTD JBOSS XMBEAN 1.2//EN"
+ "http://www.jboss.org/j2ee/dtd/jboss_xmbean_1_2.dtd">
+
+<mbean>
+ <description>An simple non-clustered post-office</description>
+ <class>org.jboss.messaging.core.plugin.DefaultPostOfficeService</class>
+
+ <!-- Managed constructors -->
+
+ <!-- Managed attributes -->
+
+ <attribute access="read-only" getMethod="getInstance">
+ <description>The instance to plug into the server peer</description>
+ <name>Instance</name>
+ <type>org.jboss.messaging.core.plugin.contract.MessagingComponent</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getDataSource" setMethod="setDataSource">
+ <description>The JNDI name of the DataSource used by this ChannelMapper instance</description>
+ <name>DataSource</name>
+ <type>java.lang.String</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getTransactionManager" setMethod="setTransactionManager">
+ <description>The ObjectName of the TransactionManager used by this ChannelMaper instance</description>
+ <name>TransactionManager</name>
+ <type>javax.management.ObjectName</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getSqlProperties" setMethod="setSqlProperties">
+ <description>DML and DDL overrides</description>
+ <name>SqlProperties</name>
+ <type>java.lang.String</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="isCreateTablesOnStartup" setMethod="setCreateTablesOnStartup">
+ <description>Should database tables be created on startup?</description>
+ <name>CreateTablesOnStartup</name>
+ <type>boolean</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getPostOfficeName" setMethod="setPostOfficeName">
+ <description>The name of the post office</description>
+ <name>PostOfficeName</name>
+ <type>java.lang.String</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getServerPeer" setMethod="setServerPeer">
+ <description>The ObjectName of the server peer this destination was deployed on</description>
+ <name>ServerPeer</name>
+ <type>javax.management.ObjectName</type>
+ </attribute>
+
+ <!-- Managed operations -->
+
+ <operation>
+ <description>JBoss Service lifecycle operation</description>
+ <name>create</name>
+ </operation>
+
+ <operation>
+ <description>JBoss Service lifecycle operation</description>
+ <name>start</name>
+ </operation>
+
+ <operation>
+ <description>JBoss Service lifecycle operation</description>
+ <name>stop</name>
+ </operation>
+
+ <operation>
+ <description>JBoss Service lifecycle operation</description>
+ <name>destroy</name>
+ </operation>
+
+</mbean>
\ No newline at end of file
Deleted: trunk/src/etc/xmdesc/SimplePostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/SimplePostOffice-xmbean.xml 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/etc/xmdesc/SimplePostOffice-xmbean.xml 2006-09-26 14:49:27 UTC (rev 1363)
@@ -1,84 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
- <!DOCTYPE mbean PUBLIC
- "-//JBoss//DTD JBOSS XMBEAN 1.2//EN"
- "http://www.jboss.org/j2ee/dtd/jboss_xmbean_1_2.dtd">
-
-<mbean>
- <description>An simple non-clustered post-office</description>
- <class>org.jboss.messaging.core.plugin.SimplePostOfficeService</class>
-
- <!-- Managed constructors -->
-
- <!-- Managed attributes -->
-
- <attribute access="read-only" getMethod="getInstance">
- <description>The instance to plug into the server peer</description>
- <name>Instance</name>
- <type>org.jboss.messaging.core.plugin.contract.MessagingComponent</type>
- </attribute>
-
- <attribute access="read-write" getMethod="getDataSource" setMethod="setDataSource">
- <description>The JNDI name of the DataSource used by this ChannelMapper instance</description>
- <name>DataSource</name>
- <type>java.lang.String</type>
- </attribute>
-
- <attribute access="read-write" getMethod="getTransactionManager" setMethod="setTransactionManager">
- <description>The ObjectName of the TransactionManager used by this ChannelMaper instance</description>
- <name>TransactionManager</name>
- <type>javax.management.ObjectName</type>
- </attribute>
-
- <attribute access="read-write" getMethod="getSqlProperties" setMethod="setSqlProperties">
- <description>DML and DDL overrides</description>
- <name>SqlProperties</name>
- <type>java.lang.String</type>
- </attribute>
-
- <attribute access="read-write" getMethod="isCreateTablesOnStartup" setMethod="setCreateTablesOnStartup">
- <description>Should database tables be created on startup?</description>
- <name>CreateTablesOnStartup</name>
- <type>boolean</type>
- </attribute>
-
- <attribute access="read-write" getMethod="getPostOfficeName" setMethod="setPostOfficeName">
- <description>The name of the post office</description>
- <name>PostOfficeName</name>
- <type>java.lang.String</type>
- </attribute>
-
- <attribute access="read-write" getMethod="getServerPeer" setMethod="setServerPeer">
- <description>The ObjectName of the server peer this destination was deployed on</description>
- <name>ServerPeer</name>
- <type>javax.management.ObjectName</type>
- </attribute>
-
- <attribute access="read-write" getMethod="getServerPeer" setMethod="setServerPeer">
- <description>The ObjectName of the server peer this destination was deployed on</description>
- <name>ServerPeer</name>
- <type>javax.management.ObjectName</type>
- </attribute>
-
- <!-- Managed operations -->
-
- <operation>
- <description>JBoss Service lifecycle operation</description>
- <name>create</name>
- </operation>
-
- <operation>
- <description>JBoss Service lifecycle operation</description>
- <name>start</name>
- </operation>
-
- <operation>
- <description>JBoss Service lifecycle operation</description>
- <name>stop</name>
- </operation>
-
- <operation>
- <description>JBoss Service lifecycle operation</description>
- <name>destroy</name>
- </operation>
-
-</mbean>
\ No newline at end of file
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -177,6 +177,8 @@
Map configuration = new HashMap();
configuration.put(Client.ENABLE_LEASE, String.valueOf(false));
+
+ log.info("*********** SERVERLOCATOR URI:" + serverLocatorURI);
client = new Client(new InvokerLocator(serverLocatorURI), configuration);
Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -139,7 +139,10 @@
if (params != null)
{
- serializationType = (String)params.get("serializationtype");
+ //serializationType = (String)params.get("serializationtype");
+
+ //Always use jms
+ serializationType = "jms";
}
while (!completed && count < MAX_RETRIES)
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -238,12 +238,9 @@
txRepository.start();
//Did the server crash last time?
- crashed = shutdownLogger.startup(serverPeerID);
-
- if (crashed)
- {
- topicPostOffice.recover();
- }
+
+ //TODO do we need this?
+ crashed = shutdownLogger.startup(serverPeerID);
initializeRemoting(mbeanServer);
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -84,6 +84,9 @@
}
String locatorURI = (String)server.getAttribute(connectorObjectName, "InvokerLocator");
+
+ log.info("******* LOCATOR URI IS " + locatorURI);
+
ServerPeer serverPeer = (ServerPeer)server.getAttribute(serverPeerObjectName, "Instance");
connectionFactoryManager = serverPeer.getConnectionFactoryManager();
Modified: trunk/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManager.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManager.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -26,8 +26,8 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -67,14 +67,14 @@
protected Map getDefaultDMLStatements()
{
- Map map = new HashMap();
+ Map map = new LinkedHashMap();
map.put("SELECT_PRECONF_CLIENTID", "SELECT CLIENTID FROM JMS_USER WHERE USERID=?");
return map;
}
protected Map getDefaultDDLStatements()
{
- Map map = new HashMap();
+ Map map = new LinkedHashMap();
map.put("CREATE_USER_TABLE",
"CREATE TABLE JMS_USER (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128)," +
" PRIMARY KEY(USERID))");
Copied: trunk/src/main/org/jboss/messaging/core/plugin/DefaultPostOfficeService.java (from rev 1321, trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java 2006-09-19 19:17:09 UTC (rev 1321)
+++ trunk/src/main/org/jboss/messaging/core/plugin/DefaultPostOfficeService.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -0,0 +1,171 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin;
+
+import javax.management.ObjectName;
+import javax.transaction.TransactionManager;
+
+import org.jboss.jms.selector.SelectorFactory;
+import org.jboss.jms.server.QueuedExecutorPool;
+import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.util.ExceptionUtil;
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.plugin.contract.MessageStore;
+import org.jboss.messaging.core.plugin.contract.MessagingComponent;
+import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.plugin.postoffice.DefaultPostOffice;
+import org.jboss.messaging.core.tx.TransactionRepository;
+
+/**
+ * A DefaultPostOfficeService
+ *
+ * MBean wrapper for a simple post office
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DefaultPostOfficeService extends JDBCServiceSupport
+{
+ private DefaultPostOffice postOffice;
+
+ private ObjectName serverPeerObjectName;
+
+ private String officeName;
+
+ private boolean started;
+
+ // Constructor ----------------------------------------------------------
+
+ public DefaultPostOfficeService()
+ {
+ }
+
+ // ServerPlugin implementation ------------------------------------------
+
+ public MessagingComponent getInstance()
+ {
+ return postOffice;
+ }
+
+ // MBean attributes -----------------------------------------------------
+
+ public synchronized ObjectName getServerPeer()
+ {
+ return serverPeerObjectName;
+ }
+
+ public synchronized void setServerPeer(ObjectName on)
+ {
+ if (started)
+ {
+ log.warn("Cannot set attribute when service is started");
+ return;
+ }
+ this.serverPeerObjectName = on;
+ }
+
+ public synchronized String getPostOfficeName()
+ {
+ return officeName;
+ }
+
+ public synchronized void setPostOfficeName(String name)
+ {
+ if (started)
+ {
+ log.warn("Cannot set attribute when service is started");
+ return;
+ }
+ this.officeName = name;
+ }
+
+ // ServiceMBeanSupport overrides ---------------------------------
+
+ protected synchronized void startService() throws Exception
+ {
+ if (started)
+ {
+ throw new IllegalStateException("Service is already started");
+ }
+
+ super.startService();
+
+ try
+ {
+ TransactionManager tm = getTransactionManagerReference();
+
+ ServerPeer serverPeer = (ServerPeer)server.getAttribute(serverPeerObjectName, "Instance");
+
+ MessageStore ms = serverPeer.getMessageStore();
+
+ PersistenceManager pm = serverPeer.getPersistenceManagerInstance();
+
+ QueuedExecutorPool pool = serverPeer.getQueuedExecutorPool();
+
+ TransactionRepository tr = serverPeer.getTxRepository();
+
+ String nodeId = serverPeer.getServerPeerID();
+
+ FilterFactory ff = new SelectorFactory();
+
+ postOffice = new DefaultPostOffice(ds, tm, sqlProperties,
+ createTablesOnStartup,
+ nodeId, officeName, ms, pm, tr, ff, pool);
+
+ postOffice.start();
+
+ started = true;
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
+ }
+ }
+
+ protected void stopService() throws Exception
+ {
+ if (!started)
+ {
+ throw new IllegalStateException("Service is not started");
+ }
+
+ super.stopService();
+
+ try
+ {
+ postOffice.stop();
+
+ postOffice = null;
+
+ started = false;
+
+ log.debug(this + " stopped");
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
+ }
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -38,6 +38,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -1793,149 +1794,6 @@
Collections.sort(references, MessageOrderComparator.instance);
}
- /*
- *
- * We want to remove any non reliable refs from the database and any corresponding messages if their channel count
- * has gone to zero
- *
- * TODO
- * Really - this method only needs to be executed on start up if the server has crashed
- * We should save a flag in the db at server shutdown and check for this at startup to see if there
- * was a clean shutdown
- *
- */
-// protected void removeUnreliableMessageData() throws Exception
-// {
-// log.trace("Removing all non-persistent data");
-//
-// Connection conn = null;
-// PreparedStatement psRes = null;
-// PreparedStatement psUpdate = null;
-// PreparedStatement psDeleteMsgs = null;
-// PreparedStatement psDeleteRefs = null;
-// TransactionWrapper wrap = new TransactionWrapper();
-//
-// ResultSet rs = null;
-//
-// try
-// {
-// conn = ds.getConnection();
-//
-// psRes = conn.prepareStatement(getSQLStatement("SELECT_ALL_CHANNELS"));
-//
-// psUpdate = conn.prepareStatement(getSQLStatement("UPDATE_UNRELIABLE_CHANNELCOUNT"));
-//
-// rs = psRes.executeQuery();
-//
-// while (rs.next())
-// {
-// long channelID = rs.getLong(1);
-//
-// psUpdate.setLong(1, channelID);
-//
-// int rows = psUpdate.executeUpdate();
-//
-// if (trace)
-// {
-// log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_UNRELIABLE_CHANNELCOUNT"))
-// + " updated " + rows + " rows");
-// }
-// }
-//
-// psDeleteRefs = conn.prepareStatement(getSQLStatement("DELETE_UNRELIABLE_REFS"));
-//
-// int rows = psDeleteRefs.executeUpdate();
-//
-// boolean doReOrder = rows > 0;
-//
-// if (trace)
-// {
-// log.trace(JDBCUtil.statementToString(getSQLStatement("DELETE_UNRELIABLE_REFS"))
-// + " deleted " + rows + " rows");
-// }
-//
-// psDeleteMsgs = conn.prepareStatement(getSQLStatement("DELETE_UNREFFED_MESSAGES"));
-//
-// rows = psDeleteMsgs.executeUpdate();
-//
-// if (trace)
-// {
-// log.trace(JDBCUtil.statementToString(getSQLStatement("DELETE_UNREFFED_MESSAGES"))
-// + " deleted " + rows + " rows");
-// }
-// }
-// catch (Exception e)
-// {
-// wrap.exceptionOccurred();
-// throw e;
-// }
-// finally
-// {
-// if (rs != null)
-// {
-// try
-// {
-// rs.close();
-// }
-// catch (Throwable e)
-// {
-// }
-// }
-// if (psRes != null)
-// {
-// try
-// {
-// psRes.close();
-// }
-// catch (Throwable e)
-// {
-// }
-// }
-// if (psUpdate != null)
-// {
-// try
-// {
-// psUpdate.close();
-// }
-// catch (Throwable e)
-// {
-// }
-// }
-// if (psDeleteMsgs != null)
-// {
-// try
-// {
-// psDeleteMsgs.close();
-// }
-// catch (Throwable e)
-// {
-// }
-// }
-// if (psDeleteRefs != null)
-// {
-// try
-// {
-// psDeleteRefs.close();
-// }
-// catch (Throwable e)
-// {
-// }
-// }
-// if (conn != null)
-// {
-// try
-// {
-// conn.close();
-// }
-// catch (Throwable e)
-// {
-// }
-// }
-// wrap.end();
-// }
-// }
-
-
protected void handleBeforeCommit1PC(List refsToAdd, List refsToRemove, Transaction tx)
throws Exception
{
@@ -3147,6 +3005,8 @@
}
}
+
+
protected void addReference(long channelID, MessageReference ref, PreparedStatement ps, boolean paged) throws Exception
{
if (trace)
@@ -3158,16 +3018,17 @@
ps.setLong(2, ref.getMessageID());
ps.setNull(3, Types.BIGINT);
ps.setString(4, "C");
+ ps.setLong(5, getOrdering());
if (paged)
{
- ps.setLong(5, ref.getPagingOrder());
+ ps.setLong(6, ref.getPagingOrder());
}
else
{
- ps.setNull(5, Types.BIGINT);
+ ps.setNull(6, Types.BIGINT);
}
- ps.setInt(6, ref.getDeliveryCount());
- ps.setString(7, ref.isReliable() ? "Y" : "N");
+ ps.setInt(7, ref.getDeliveryCount());
+ ps.setString(8, ref.isReliable() ? "Y" : "N");
}
protected void removeReference(long channelID, MessageReference ref, PreparedStatement ps) throws Exception
@@ -3194,9 +3055,10 @@
ps.setLong(2, ref.getMessageID());
ps.setLong(3, tx.getId());
ps.setString(4, "+");
- ps.setNull(5, Types.BIGINT);
- ps.setInt(6, ref.getDeliveryCount());
- ps.setString(7, ref.isReliable() ? "Y" : "N");
+ ps.setLong(5, getOrdering());
+ ps.setNull(6, Types.BIGINT);
+ ps.setInt(7, ref.getDeliveryCount());
+ ps.setString(8, ref.isReliable() ? "Y" : "N");
}
protected void prepareToRemoveReference(long channelID, MessageReference ref, Transaction tx, PreparedStatement ps)
@@ -3636,12 +3498,12 @@
protected Map getDefaultDDLStatements()
{
- Map map = new HashMap();
+ Map map = new LinkedHashMap();
//Message reference
map.put("CREATE_MESSAGE_REFERENCE",
"CREATE TABLE JMS_MESSAGE_REFERENCE (CHANNELID BIGINT, " +
"MESSAGEID BIGINT, TRANSACTIONID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, " +
- "DELIVERYCOUNT INTEGER, RELIABLE CHAR(1), PRIMARY KEY(CHANNELID, MESSAGEID))");
+ "DELIVERYCOUNT INTEGER, RELIABLE CHAR(1), LOADED CHAR(1), PRIMARY KEY(CHANNELID, MESSAGEID))");
map.put("CREATE_IDX_MESSAGE_REF_TX", "CREATE INDEX JMS_MESSAGE_REF_TX ON JMS_MESSAGE_REFERENCE (TRANSACTIONID)");
map.put("CREATE_IDX_MESSAGE_REF_ORD", "CREATE INDEX JMS_MESSAGE_REF_ORD ON JMS_MESSAGE_REFERENCE (ORD)");
map.put("CREATE_IDX_MESSAGE_REF_PAGE_ORD", "CREATE INDEX JMS_MESSAGE_REF__PAGE_ORD ON JMS_MESSAGE_REFERENCE (PAGE_ORD)");
@@ -3663,18 +3525,16 @@
//Counter
map.put("CREATE_COUNTER",
"CREATE TABLE JMS_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))");
- //Sequences
- map.put("CREATE_ORDERING_SEQUENCE", "CREATE SEQUENCE REF_ORD");
return map;
}
protected Map getDefaultDMLStatements()
{
- Map map = new HashMap();
+ Map map = new LinkedHashMap();
//Message reference
map.put("INSERT_MESSAGE_REF",
"INSERT INTO JMS_MESSAGE_REFERENCE (CHANNELID, MESSAGEID, TRANSACTIONID, STATE, ORD, PAGE_ORD, DELIVERYCOUNT, RELIABLE) " +
- "VALUES (?, ?, ?, ?, NEXT VALUE FOR REF_ORD, ?, ?, ?)");
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?)");
map.put("DELETE_MESSAGE_REF", "DELETE FROM JMS_MESSAGE_REFERENCE WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'");
map.put("UPDATE_MESSAGE_REF",
"UPDATE JMS_MESSAGE_REFERENCE SET TRANSACTIONID=?, STATE='-' " +
@@ -3690,9 +3550,7 @@
map.put("LOAD_UNPAGED_REFS",
"SELECT MESSAGEID, DELIVERYCOUNT, RELIABLE FROM JMS_MESSAGE_REFERENCE " +
"WHERE PAGE_ORD IS NULL and CHANNELID = ? ORDER BY ORD");
- map.put("UPDATE_RELIABLE_REFS_NOT_PAGED", "UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?");
- map.put("DELETE_UNRELIABLE_REFS", "DELETE FROM JMS_MESSAGE_REFERENCE WHERE RELIABLE = 'N'");
- map.put("SHIFT_PAGE_ORDER", "UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = PAGE_ORD + ? WHERE CHANNELID = ?");
+ map.put("UPDATE_RELIABLE_REFS_NOT_PAGED", "UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?");
map.put("SELECT_MIN_MAX_PAGE_ORD", "SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?");
map.put("SELECT_EXISTS_REF", "SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?");
//Message
@@ -3709,10 +3567,6 @@
map.put("UPDATE_MESSAGE_CHANNELCOUNT", "UPDATE JMS_MESSAGE SET CHANNELCOUNT=? WHERE MESSAGEID=?");
map.put("DELETE_MESSAGE", "DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?");
map.put("MESSAGEID_COLUMN", "MESSAGEID");
- map.put("UPDATE_UNRELIABLE_CHANNELCOUNT",
- "UPDATE JMS_MESSAGE M SET M.CHANNELCOUNT = M.CHANNELCOUNT - 1 WHERE " +
- "M.MESSAGEID IN (SELECT MR.MESSAGEID FROM JMS_MESSAGE_REFERENCE MR WHERE MR.RELIABLE = 'N' AND MR.CHANNELID = ?)");
- map.put("DELETE_UNREFFED_MESSAGES", "DELETE FROM JMS_MESSAGE WHERE CHANNELCOUNT = 0");
//Transaction
map.put("INSERT_TRANSACTION",
"INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) " +
@@ -3729,6 +3583,38 @@
}
// Private -------------------------------------------------------
+
+ private short orderCount;
+
+ private synchronized long getOrdering()
+ {
+ //We generate the ordering for the message reference by taking the lowest 48 bits of the current time and
+ //concetaning with a 15 bit rotating counter to form a string of 63 bits which we then place
+ //in the right most bits of a long, giving a positive signed 63 bit integer.
+
+ //We only have to guarantee ordering per session, so having slight differences of time on different nodes is
+ //not a problem
+
+ //This is good for about 8919 years - if you're still running JBoss Messaging then, I suggest you need an
+ //upgrade!
+
+ long order = System.currentTimeMillis();
+
+ order = order << 15;
+
+ order = order | orderCount;
+
+ if (orderCount == Short.MAX_VALUE)
+ {
+ orderCount = 0;
+ }
+ else
+ {
+ orderCount++;
+ }
+
+ return order;
+ }
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCShutdownLogger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCShutdownLogger.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCShutdownLogger.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -24,7 +24,7 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
@@ -252,16 +252,16 @@
protected Map getDefaultDDLStatements()
{
- Map sql = new HashMap();
+ Map sql = new LinkedHashMap();
- sql.put("CREATE_STARTUP", "CREATE TABLE JMS_STARTUP (NODE_ID CHAR(1) PRIMARY KEY)");
+ sql.put("CREATE_STARTUP", "CREATE TABLE JMS_STARTUP (NODE_ID VARCHAR(255) PRIMARY KEY)");
return sql;
}
protected Map getDefaultDMLStatements()
{
- Map sql = new HashMap();
+ Map sql = new LinkedHashMap();
sql.put("SELECT_STARTUP", "SELECT NODE_ID FROM JMS_STARTUP WHERE NODE_ID = ?");
sql.put("DELETE_STARTUP", "DELETE FROM JMS_STARTUP WHERE NODE_ID = ?");
Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCSupport.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -24,8 +24,8 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
@@ -66,9 +66,9 @@
public JDBCSupport()
{
- defaultDMLStatements = new HashMap();
+ defaultDMLStatements = new LinkedHashMap();
- defaultDDLStatements = new HashMap();
+ defaultDDLStatements = new LinkedHashMap();
sqlProperties = new Properties();
}
@@ -123,7 +123,7 @@
if (sqlProperties.get(statementName) == null)
{
- throw new IllegalStateException("SQL statement " + statementName + " is not specified in the persistence manager SQL properties");
+ throw new IllegalStateException("SQL statement " + statementName + " is not specified in the SQL properties");
}
}
@@ -135,7 +135,7 @@
if (sqlProperties.get(statementName) == null)
{
- throw new IllegalStateException("SQL statement " + statementName + " is not specified in the persistence manager SQL properties");
+ throw new IllegalStateException("SQL statement " + statementName + " is not specified in the SQL properties");
}
}
}
@@ -204,16 +204,19 @@
String statement = getSQLStatement(statementName);
- try
- {
- if (log.isTraceEnabled()) { log.trace("Executing: " + statement); }
-
- conn.createStatement().executeUpdate(statement);
+ if (!"IGNORE".equals(statement))
+ {
+ try
+ {
+ if (log.isTraceEnabled()) { log.trace("Executing: " + statement); }
+
+ conn.createStatement().executeUpdate(statement);
+ }
+ catch (SQLException e)
+ {
+ log.debug("Failed to execute: " + statement, e);
+ }
}
- catch (SQLException e)
- {
- log.debug("Failed to execute: " + statement, e);
- }
}
}
finally
Deleted: trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/SimplePostOfficeService.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -1,171 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin;
-
-import javax.management.ObjectName;
-import javax.transaction.TransactionManager;
-
-import org.jboss.jms.selector.SelectorFactory;
-import org.jboss.jms.server.QueuedExecutorPool;
-import org.jboss.jms.server.ServerPeer;
-import org.jboss.jms.util.ExceptionUtil;
-import org.jboss.messaging.core.FilterFactory;
-import org.jboss.messaging.core.plugin.contract.MessageStore;
-import org.jboss.messaging.core.plugin.contract.MessagingComponent;
-import org.jboss.messaging.core.plugin.contract.PersistenceManager;
-import org.jboss.messaging.core.plugin.postoffice.DefaultPostOffice;
-import org.jboss.messaging.core.tx.TransactionRepository;
-
-/**
- * A SimplePostOfficeService
- *
- * MBean wrapper for a simple post office
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class SimplePostOfficeService extends JDBCServiceSupport
-{
- private DefaultPostOffice postOffice;
-
- private ObjectName serverPeerObjectName;
-
- private String officeName;
-
- private boolean started;
-
- // Constructor ----------------------------------------------------------
-
- public SimplePostOfficeService()
- {
- }
-
- // ServerPlugin implementation ------------------------------------------
-
- public MessagingComponent getInstance()
- {
- return postOffice;
- }
-
- // MBean attributes -----------------------------------------------------
-
- public synchronized ObjectName getServerPeer()
- {
- return serverPeerObjectName;
- }
-
- public synchronized void setServerPeer(ObjectName on)
- {
- if (started)
- {
- log.warn("Cannot set attribute when service is started");
- return;
- }
- this.serverPeerObjectName = on;
- }
-
- public synchronized String getPostOfficeName()
- {
- return officeName;
- }
-
- public synchronized void setPostOfficeName(String name)
- {
- if (started)
- {
- log.warn("Cannot set attribute when service is started");
- return;
- }
- this.officeName = name;
- }
-
- // ServiceMBeanSupport overrides ---------------------------------
-
- protected synchronized void startService() throws Exception
- {
- if (started)
- {
- throw new IllegalStateException("Service is already started");
- }
-
- super.startService();
-
- try
- {
- TransactionManager tm = getTransactionManagerReference();
-
- ServerPeer serverPeer = (ServerPeer)server.getAttribute(serverPeerObjectName, "Instance");
-
- MessageStore ms = serverPeer.getMessageStore();
-
- PersistenceManager pm = serverPeer.getPersistenceManagerInstance();
-
- QueuedExecutorPool pool = serverPeer.getQueuedExecutorPool();
-
- TransactionRepository tr = serverPeer.getTxRepository();
-
- String nodeId = serverPeer.getServerPeerID();
-
- FilterFactory ff = new SelectorFactory();
-
- postOffice = new DefaultPostOffice(ds, tm, sqlProperties,
- createTablesOnStartup,
- nodeId, officeName, ms, pm, tr, ff, pool);
-
- postOffice.start();
-
- started = true;
- }
- catch (Throwable t)
- {
- throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
- }
- }
-
- protected void stopService() throws Exception
- {
- if (!started)
- {
- throw new IllegalStateException("Service is not started");
- }
-
- super.stopService();
-
- try
- {
- postOffice.stop();
-
- postOffice = null;
-
- started = false;
-
- log.debug(this + " stopped");
- }
- catch (Throwable t)
- {
- throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
- }
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -27,7 +27,6 @@
import java.sql.Types;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -635,10 +634,10 @@
protected Map getDefaultDMLStatements()
{
- Map map = new HashMap();
+ Map map = new LinkedHashMap();
map.put("INSERT_BINDING",
- "INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) " +
- "VALUES (?, ?, ?, ?, ?, ?)");
+ "INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) " +
+ "VALUES (?, ?, ?, ?, ?, ?)");
map.put("DELETE_BINDING",
"DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?");
map.put("LOAD_BINDINGS",
@@ -649,11 +648,11 @@
protected Map getDefaultDDLStatements()
{
- Map map = new HashMap();
+ Map map = new LinkedHashMap();
map.put("CREATE_POSTOFFICE_TABLE",
- "CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(256), NODE_ID VARCHAR(256)," +
- "QUEUE_NAME VARCHAR(1024), CONDITION VARCHAR(1024), " +
- "SELECTOR VARCHAR(1024), CHANNEL_ID BIGINT)");
+ "CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255)," +
+ "QUEUE_NAME VARCHAR(1023), CONDITION VARCHAR(1023), " +
+ "SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)");
return map;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -130,8 +130,6 @@
private StatsSender statsSender;
- private long statsSendPeriod;
-
private boolean started;
public DefaultClusteredPostOffice()
@@ -230,9 +228,7 @@
this.routerFactory = rf;
this.pullSize = pullSize;
-
- this.statsSendPeriod = statsSendPeriod;
-
+
routerMap = new HashMap();
statsSender = new StatsSender(this, statsSendPeriod);
@@ -442,7 +438,7 @@
//to send the message to the other office instances on the cluster if there are
//queues on those nodes that need to receive the message
- //FIXME - there is a bug here, numberRemote does not take into account that more than one
+ //TODO - there is an innefficiency here, numberRemote does not take into account that more than one
//of the number remote may be on the same node, so we could end up multicasting
//when unicast would do
if (numberRemote > 0)
@@ -454,7 +450,7 @@
// log.info("unicast no tx");
//Unicast - only one node is interested in the message
- //FIXME - temporarily commented out until can get unicast to work
+ //TODO - temporarily commented out until can get unicast to work
//asyncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId);
asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
}
@@ -671,17 +667,51 @@
/*
* Unicast a message to one members of the group
*/
- public void asyncSendRequest(ClusterRequest request, Address address) throws Exception
+ public void asyncSendRequest(ClusterRequest request, String nodeId) throws Exception
{
+ Address address = this.getAddressForNodeId(nodeId);
+
+ if (address == null)
+ {
+ throw new IllegalArgumentException("Cannot find address for node " + nodeId);
+ }
+
byte[] bytes = writeRequest(request);
Message m = new Message(address, null, bytes);
- //TODO - handle serialization more efficiently
asyncChannel.send(m);
}
/*
+ * Unicast a sync request
+ */
+ public Object syncSendRequest(ClusterRequest request, String nodeId, boolean ignoreNoAddress) throws Exception
+ {
+ Address address = this.getAddressForNodeId(nodeId);
+
+ if (address == null)
+ {
+ if (ignoreNoAddress)
+ {
+ return null;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot find address for node " + nodeId);
+ }
+ }
+
+ byte[] bytes = writeRequest(request);
+
+ Message message = new Message(address, null, bytes);
+
+ Object result = controlMessageDispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
+
+ return result;
+ }
+
+ /*
* We put the transaction in the holding area
*/
public void holdTransaction(TransactionId id, ClusterTransaction tx) throws Exception
@@ -919,19 +949,7 @@
}
}
- /*
- * Unicast a sync request
- */
- public Object syncSendRequest(ClusterRequest request, Address address) throws Exception
- {
- byte[] bytes = writeRequest(request);
-
- Message message = new Message(address, null, bytes);
-
- Object result = controlMessageDispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
-
- return result;
- }
+
// Public ------------------------------------------------------------------------------------------
@@ -1142,8 +1160,6 @@
}
}
- //TODO - Sort out serialization properly
-
private byte[] getStateAsBytes() throws Exception
{
List bindings = new ArrayList();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -155,8 +155,6 @@
{
List dels = new ArrayList();
- log.info("getting " + number + " deliveries, there are " + messageRefs.size() + " available");
-
synchronized (refLock)
{
synchronized (deliveryLock)
@@ -183,7 +181,6 @@
}
else
{
- log.info("Returning an empty list since receivers are ready");
return Collections.EMPTY_LIST;
}
}
@@ -245,7 +242,6 @@
//refs but there are none available in the channel (either the channel is empty
//or there are only refs that don't match any selectors)
//then we should perhaps pull some messages from a remote queue
- log.info("pulling messages");
pullMessages();
}
}
@@ -296,15 +292,7 @@
theQueue = pullQueue;
thePullSize = pullSize;
}
-
- Address fromAddress = office.getAddressForNodeId(theQueue.getNodeId());
-
- if (fromAddress == null)
- {
- //This is ok - the node might have left the group
- return;
- }
-
+
Transaction tx = tr.createTransaction();
ClusterRequest req = new PullMessagesRequest(this.nodeId, tx.getId(), theQueue.getChannelID(),
@@ -313,8 +301,14 @@
log.info(System.identityHashCode(this) + " Executing pull messages request for queue " + name +
" pulling from node " + theQueue.getNodeId() + " to node " + this.nodeId);
- byte[] bytes = (byte[])office.syncSendRequest(req, fromAddress);
+ byte[] bytes = (byte[])office.syncSendRequest(req, theQueue.getNodeId(), true);
+ if (bytes == null)
+ {
+ //Ok - node might have left the group
+ return;
+ }
+
log.info( System.identityHashCode(this) +" Executed pull messages request");
PullMessagesResponse response = new PullMessagesResponse();
@@ -371,7 +365,7 @@
{
req = new PullMessagesRequest(this.nodeId, tx.getId());
- office.asyncSendRequest(req, fromAddress);
+ office.asyncSendRequest(req, theQueue.getNodeId());
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -59,9 +59,9 @@
void asyncSendRequest(ClusterRequest request) throws Exception;
- void asyncSendRequest(ClusterRequest request, Address address) throws Exception;
+ void asyncSendRequest(ClusterRequest request, String nodeId) throws Exception;
- Object syncSendRequest(ClusterRequest request, Address address) throws Exception;
+ Object syncSendRequest(ClusterRequest request, String nodeId, boolean ignoreNoAddress) throws Exception;
void holdTransaction(TransactionId id, ClusterTransaction tx) throws Throwable;
@@ -76,6 +76,4 @@
boolean referenceExistsInStorage(long channelID, long messageID) throws Exception;
List getDeliveries(String queueName, int numMessages) throws Exception;
-
- Address getAddressForNodeId(String nodeId) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -170,9 +170,7 @@
//We need to ack them in memory only
//since they would have been acked on the pulling node
LocalClusteredQueue queue = (LocalClusteredQueue)del.getObserver();
-
- log.info("i am committing request");
-
+
queue.acknowledgeFromCluster(del);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -237,23 +237,19 @@
}
public void activate()
- {
- throw new UnsupportedOperationException();
+ {
}
public void deactivate()
- {
- throw new UnsupportedOperationException();
+ {
}
public void load() throws Exception
- {
- throw new UnsupportedOperationException();
+ {
}
public void unload() throws Exception
- {
- throw new UnsupportedOperationException();
+ {
}
public boolean isActive()
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -46,16 +46,32 @@
}
return
- "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;bind_addr=" + host + ";up_thread=false;down_thread=false):" +
- "PING(timeout=2000;num_initial_members=3;up_thread=false;down_thread=false):"+
- "FD(timeout=3000;up_thread=false;down_thread=false):"+
- "VERIFY_SUSPECT(timeout=1500;up_thread=false;down_thread=false):"+
- "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
- "UNICAST(timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
- "pbcast.STABLE(desired_avg_gossip=10000;up_thread=false;down_thread=false):"+
- "FRAG(up_thread=false;down_thread=false):"+
- "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true;up_thread=false;down_thread=false)";
+// "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;bind_addr=" + host + ";up_thread=false;down_thread=false):" +
+// "PING(timeout=2000;num_initial_members=3;up_thread=false;down_thread=false):"+
+// "FD(timeout=3000;up_thread=false;down_thread=false):"+
+// "VERIFY_SUSPECT(timeout=1500;up_thread=false;down_thread=false):"+
+// "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
+// "UNICAST(timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
+// "pbcast.STABLE(desired_avg_gossip=10000;up_thread=false;down_thread=false):"+
+// "FRAG(up_thread=false;down_thread=false):"+
+// "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true;up_thread=false;down_thread=false)";
+ "UDP(mcast_recv_buf_size=500000;down_thread=false;ip_mcast=true;mcast_send_buf_size=32000;"+
+ "mcast_port=45566;ucast_recv_buf_size=500000;use_incoming_packet_handler=false;"+
+ "mcast_addr=228.8.8.8;use_outgoing_packet_handler=true;ucast_send_buf_size=32000;ip_ttl=32;"+
+ "bind_addr=127.0.0.1;loopback=true):"+
+ "AUTOCONF(down_thread=false;up_thread=false):"+
+ "PING(timeout=2000;down_thread=false;num_initial_members=3;up_thread=false):"+
+ "MERGE2(max_interval=10000;down_thread=false;min_interval=5000;up_thread=false):"+
+ "FD(timeout=2000;max_tries=3;down_thread=false;up_thread=false;shun=true):"+
+ "VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):"+
+ "pbcast.NAKACK(max_xmit_size=8192;down_thread=false;use_mcast_xmit=true;gc_lag=50;up_thread=false;"+
+ "retransmit_timeout=100,200,600,1200,2400,4800):"+
+ "UNICAST(timeout=1200,2400,3600;down_thread=false;up_thread=false):"+
+ "pbcast.STABLE(stability_delay=1000;desired_avg_gossip=20000;down_thread=false;max_bytes=0;up_thread=false):"+
+ "FRAG(frag_size=8192;down_thread=false;up_thread=false):"+
+ "VIEW_SYNC(avg_send_interval=60000;down_thread=false;up_thread=false):"+
+ "pbcast.GMS(print_local_addr=true;join_timeout=3000;down_thread=false;join_retry_timeout=2000;up_thread=false;shun=true)";
}
@@ -73,15 +89,22 @@
}
return
- "UDP(mcast_addr=228.8.8.8;mcast_port=45568;ip_ttl=32;bind_addr=" + host + ";up_thread=false;down_thread=false):" +
- "PING(timeout=2000;num_initial_members=3;up_thread=false;down_thread=false):"+
- "FD(timeout=3000;up_thread=false;down_thread=false):"+
- "VERIFY_SUSPECT(timeout=1500;up_thread=false;down_thread=false):"+
- "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
- "pbcast.STABLE(desired_avg_gossip=10000;up_thread=false;down_thread=false):"+
- "FRAG(up_thread=false;down_thread=false):"+
- "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true;up_thread=false;down_thread=false):" +
- "pbcast.STATE_TRANSFER(up_thread=false;down_thread=false)";
+ "UDP(mcast_recv_buf_size=500000;down_thread=false;ip_mcast=true;mcast_send_buf_size=32000;"+
+ "mcast_port=45568;ucast_recv_buf_size=500000;use_incoming_packet_handler=false;"+
+ "mcast_addr=228.8.8.8;use_outgoing_packet_handler=true;loopback=true;ucast_send_buf_size=32000;ip_ttl=32;bind_addr=127.0.0.1):"+
+ "AUTOCONF(down_thread=false;up_thread=false):"+
+ "PING(timeout=2000;down_thread=false;num_initial_members=3;up_thread=false):"+
+ "MERGE2(max_interval=10000;down_thread=false;min_interval=5000;up_thread=false):"+
+ "FD(timeout=2000;max_tries=3;down_thread=false;up_thread=false;shun=true):"+
+ "VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):"+
+ "pbcast.NAKACK(max_xmit_size=8192;down_thread=false;use_mcast_xmit=true;gc_lag=50;up_thread=false;"+
+ "retransmit_timeout=100,200,600,1200,2400,4800):"+
+ "UNICAST(timeout=1200,2400,3600;down_thread=false;up_thread=false):"+
+ "pbcast.STABLE(stability_delay=1000;desired_avg_gossip=20000;down_thread=false;max_bytes=0;up_thread=false):"+
+ "FRAG(frag_size=8192;down_thread=false;up_thread=false):"+
+ "VIEW_SYNC(avg_send_interval=60000;down_thread=false;up_thread=false):"+
+ "pbcast.GMS(print_local_addr=true;join_timeout=3000;down_thread=false;join_retry_timeout=2000;up_thread=false;shun=true):"+
+ "pbcast.STATE_TRANSFER(down_thread=false;up_thread=false)";
}
// Attributes ----------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-09-25 14:36:23 UTC (rev 1362)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-09-26 14:49:27 UTC (rev 1363)
@@ -226,9 +226,7 @@
log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
-
-
-
+
log.info("trying to consume");
//So we have 150 messages in total - 30 on each node.
More information about the jboss-cvs-commits
mailing list