JBoss hornetq SVN: r8967 - in branches/Clebert_TMP: src/main/org/hornetq/jms/management/impl and 14 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-03-26 01:08:39 -0400 (Fri, 26 Mar 2010)
New Revision: 8967
Added:
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedJNDI.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedType.java
Modified:
branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java
branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/DestinationControl.java
branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/JMSServerControl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/JMSServerManager.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/management/JMSManagementService.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
branches/Clebert_TMP/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/connection/ExceptionListenerTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
implementing jndi bindings & adding methods to the server manager
Modified: branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/ConnectionFactoryControl.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -38,11 +38,6 @@
String getName();
/**
- * Returns the JNDI bindings associated to this connection factory.
- */
- List<String> getBindings();
-
- /**
* Returns the Client ID of this connection factory (or {@code null} if it is not set.
*/
String getClientID();
Modified: branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/DestinationControl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/DestinationControl.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/DestinationControl.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -33,11 +33,6 @@
String getName();
/**
- * Returns the JNDI binding of this destination.
- */
- String getJNDIBinding();
-
- /**
* Returns the HornetQ address corresponding to this destination.
*/
String getAddress();
Modified: branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -75,6 +75,12 @@
String getSelector();
// Operations ----------------------------------------------------
+
+ /**
+ * Add the JNDI binding to this destination
+ */
+ @Operation(desc = "Adds the queue to another binding")
+ void addJNDI(@Parameter(name = "jndiBinding", desc = "the name of the binding for JNDI") String jndi) throws Exception;
/**
* Lists all the JMS messages in this queue matching the specified filter.
Modified: branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/JMSServerControl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -69,8 +69,7 @@
* @return {@code true} if the queue was created, {@code false} else
*/
@Operation(desc = "Create a JMS Queue", impact = MBeanOperationInfo.ACTION)
- boolean createQueue(@Parameter(name = "name", desc = "Name of the queue to create") String name,
- @Parameter(name = "jndiBinding", desc = "the name of the binding for JNDI") String jndiBinding) throws Exception;
+ boolean createQueue(@Parameter(name = "name", desc = "Name of the queue to create") String name) throws Exception;
/**
* Destroys a JMS Queue with the specified name.
@@ -86,8 +85,7 @@
* @return {@code true} if the topic was created, {@code false} else
*/
@Operation(desc = "Create a JMS Topic", impact = MBeanOperationInfo.ACTION)
- boolean createTopic(@Parameter(name = "name", desc = "Name of the topic to create") String name,
- @Parameter(name = "jndiBinding", desc = "the name of the binding for JNDI") String jndiBinding) throws Exception;
+ boolean createTopic(@Parameter(name = "name", desc = "Name of the topic to create") String name) throws Exception;
/**
* Destroys a JMS Topic with the specified name.
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSConnectionFactoryControlImpl.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -21,10 +21,9 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.api.jms.management.ConnectionFactoryControl;
import org.hornetq.core.management.impl.MBeanInfoHelper;
+import org.hornetq.jms.client.HornetQConnectionFactory;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@@ -40,8 +39,6 @@
private final HornetQConnectionFactory cf;
- private final List<String> bindings;
-
private final String name;
// Static --------------------------------------------------------
@@ -49,24 +46,17 @@
// Constructors --------------------------------------------------
public JMSConnectionFactoryControlImpl(final HornetQConnectionFactory cf,
- final String name,
- final List<String> bindings) throws NotCompliantMBeanException
+ final String name) throws NotCompliantMBeanException
{
super(ConnectionFactoryControl.class);
this.cf = cf;
this.name = name;
- this.bindings = bindings;
}
// Public --------------------------------------------------------
// ManagedConnectionFactoryMBean implementation ------------------
- public List<String> getBindings()
- {
- return bindings;
- }
-
public String getClientID()
{
return cf.getClientID();
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -30,6 +30,7 @@
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.jms.client.SelectorTranslator;
+import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;
@@ -48,11 +49,11 @@
// Attributes ----------------------------------------------------
private final HornetQDestination managedQueue;
+
+ private final JMSServerManager jmsServerManager;
private final QueueControl coreQueueControl;
- private final String binding;
-
private final MessageCounter counter;
// Static --------------------------------------------------------
@@ -85,13 +86,13 @@
public JMSQueueControlImpl(final HornetQDestination managedQueue,
final QueueControl coreQueueControl,
- final String jndiBinding,
+ final JMSServerManager jmsServerManager,
final MessageCounter counter) throws Exception
{
super(JMSQueueControl.class);
this.managedQueue = managedQueue;
+ this.jmsServerManager = jmsServerManager;
this.coreQueueControl = coreQueueControl;
- binding = jndiBinding;
this.counter = counter;
}
@@ -144,11 +145,6 @@
return coreQueueControl.isDurable();
}
- public String getJNDIBinding()
- {
- return binding;
- }
-
public String getDeadLetterAddress()
{
return coreQueueControl.getDeadLetterAddress();
@@ -169,6 +165,16 @@
coreQueueControl.setExpiryAddress(expiryAddres);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.api.jms.management.JMSQueueControl#addJNDI(java.lang.String)
+ */
+ public void addJNDI(String jndi) throws Exception
+ {
+ jmsServerManager.addQueueToJndi(managedQueue.getName(), jndi);
+ }
+
+
+
public boolean removeMessage(final String messageID) throws Exception
{
String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
@@ -355,7 +361,7 @@
MBeanInfoHelper.getMBeanOperationsInfo(JMSQueueControl.class),
info.getNotifications());
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -227,9 +227,9 @@
}
- public boolean createQueue(final String name, final String jndiBinding) throws Exception
+ public boolean createQueue(final String name) throws Exception
{
- boolean created = server.createQueue(name, jndiBinding, null, true);
+ boolean created = server.createQueue(name, null, true);
if (created)
{
sendNotification(NotificationType.QUEUE_CREATED, name);
@@ -247,9 +247,9 @@
return destroyed;
}
- public boolean createTopic(final String topicName, final String jndiBinding) throws Exception
+ public boolean createTopic(final String topicName) throws Exception
{
- boolean created = server.createTopic(topicName, jndiBinding);
+ boolean created = server.createTopic(topicName);
if (created)
{
sendNotification(NotificationType.TOPIC_CREATED, topicName);
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -53,8 +53,6 @@
private final HornetQDestination managedTopic;
- private final String binding;
-
private final AddressControl addressControl;
private final ManagementService managementService;
@@ -71,13 +69,11 @@
public JMSTopicControlImpl(final HornetQDestination topic,
final AddressControl addressControl,
- final String jndiBinding,
final ManagementService managementService) throws Exception
{
super(TopicControl.class);
managedTopic = topic;
this.addressControl = addressControl;
- binding = jndiBinding;
this.managementService = managementService;
}
@@ -98,11 +94,6 @@
return managedTopic.getAddress();
}
- public String getJNDIBinding()
- {
- return binding;
- }
-
public int getMessageCount()
{
return getMessageCount(DurabilityType.ALL);
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -16,7 +16,6 @@
import java.util.List;
import org.hornetq.core.server.HornetQComponent;
-import org.hornetq.jms.persistence.impl.PersistedJNDIBinding;
/**
* A JMSPersistence
@@ -40,7 +39,7 @@
void storeDestination(PersistedDestination destination) throws Exception;
- void deleteDestination(String name) throws Exception;
+ void deleteDestination(PersistedType type, String name) throws Exception;
List<PersistedDestination> recoverDestinations();
@@ -58,5 +57,11 @@
// Inner classes -------------------------------------------------
- void storeJndiBinding(PersistedJNDIBinding persistedJNDIBinding);
+ void addJNDI(PersistedType type, String name, String address) throws Exception;
+
+ List<PersistedJNDI> recoverPersistedJNDI() throws Exception;
+
+ void deleteJNDI(PersistedType type, String name, String address) throws Exception;
+
+ void deleteJNDI(PersistedType type, String name) throws Exception;
}
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -15,7 +15,6 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.jms.persistence.impl.DestinationType;
import org.hornetq.utils.BufferHelper;
import org.hornetq.utils.DataConstants;
@@ -34,12 +33,10 @@
private long id;
- private DestinationType type;
+ private PersistedType type;
private String name;
- private String jndiBinding;
-
private String selector;
private boolean durable;
@@ -50,17 +47,16 @@
public PersistedDestination()
{
}
-
- public PersistedDestination(final DestinationType type, final String name, final String jndiBinding)
+
+ public PersistedDestination(final PersistedType type, final String name)
{
- this(type, name, jndiBinding, null, false);
+ this(type, name, null, true);
}
- public PersistedDestination(final DestinationType type, final String name, final String jndiBinding, final String selector, final boolean durable)
+ public PersistedDestination(final PersistedType type, final String name, final String selector, final boolean durable)
{
this.type = type;
this.name = name;
- this.jndiBinding = jndiBinding;
this.selector = selector;
this.durable = durable;
}
@@ -90,13 +86,8 @@
return name;
}
- public String getJndiBinding()
+ public PersistedType getType()
{
- return jndiBinding;
- }
-
- public DestinationType getType()
- {
return type;
}
@@ -112,27 +103,24 @@
public int getEncodeSize()
{
- return DataConstants.SIZE_INT +
+ return DataConstants.SIZE_BYTE +
BufferHelper.sizeOfSimpleString(name) +
- BufferHelper.sizeOfSimpleString(jndiBinding) +
BufferHelper.sizeOfNullableSimpleString(selector) +
DataConstants.SIZE_BOOLEAN;
}
public void encode(final HornetQBuffer buffer)
{
- buffer.writeInt(type.getType());
+ buffer.writeByte(type.getType());
buffer.writeString(name);
- buffer.writeString(jndiBinding);
buffer.writeNullableString(selector);
buffer.writeBoolean(durable);
}
public void decode(final HornetQBuffer buffer)
{
- type = DestinationType.getType(buffer.readInt());
+ type = PersistedType.getType(buffer.readByte());
name = buffer.readString();
- jndiBinding = buffer.readString();
selector = buffer.readNullableString();
durable = buffer.readBoolean();
}
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedJNDI.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedJNDI.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedJNDI.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.persistence;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.utils.BufferHelper;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A PersistedJNDI
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PersistedJNDI implements EncodingSupport
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long id;
+
+ private PersistedType type;
+
+ private String name;
+
+ private ArrayList<String> jndi = new ArrayList<String>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PersistedJNDI()
+ {
+ }
+
+ /**
+ * @param type
+ * @param name
+ */
+ public PersistedJNDI(PersistedType type, String name)
+ {
+ super();
+ this.type = type;
+ this.name = name;
+ }
+
+ // Public --------------------------------------------------------
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ type = PersistedType.getType(buffer.readByte());
+ name = buffer.readSimpleString().toString();
+ int jndiArraySize = buffer.readInt();
+ jndi = new ArrayList<String>(jndiArraySize);
+
+ for (int i = 0 ; i < jndiArraySize; i++)
+ {
+ jndi.add(buffer.readSimpleString().toString());
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeByte(type.getType());
+ BufferHelper.writeAsSimpleString(buffer, name);
+ buffer.writeInt(jndi.size());
+ for (String jndiEl : jndi)
+ {
+ BufferHelper.writeAsSimpleString(buffer, jndiEl);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_BYTE +
+ BufferHelper.sizeOfSimpleString(name) +
+ sizeOfJNDI();
+ }
+
+ private int sizeOfJNDI()
+ {
+ int size = DataConstants.SIZE_INT; // for the number of elements written
+
+ for (String str : jndi)
+ {
+ size += BufferHelper.sizeOfSimpleString(str);
+ }
+
+ return size;
+ }
+
+ /**
+ * @return the id
+ */
+ public long getId()
+ {
+ return id;
+ }
+
+ /**
+ * @param id the id to set
+ */
+ public void setId(long id)
+ {
+ this.id = id;
+ }
+
+ /**
+ * @return the type
+ */
+ public PersistedType getType()
+ {
+ return type;
+ }
+
+ /**
+ * @return the name
+ */
+ public String getName()
+ {
+ return name;
+ }
+
+ /**
+ * @return the jndi
+ */
+ public List<String> getJndi()
+ {
+ return jndi;
+ }
+
+ public void addJNDI(String address)
+ {
+ jndi.add(address);
+ }
+
+ public void deleteJNDI(String address)
+ {
+ jndi.remove(address);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedType.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedType.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedType.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.persistence;
+
+/**
+ * A PersistedType
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public enum PersistedType
+{
+ ConnectionFactory, Topic, Queue;
+
+ public byte getType()
+ {
+ switch (this)
+ {
+ case ConnectionFactory: return 0;
+ case Topic: return 1;
+ case Queue: return 2;
+ default: return -1;
+ }
+ }
+
+ public static PersistedType getType(byte type)
+ {
+ switch (type)
+ {
+ case 0: return ConnectionFactory;
+ case 1: return Topic;
+ case 2: return Queue;
+ default: return null;
+ }
+ }
+}
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.journal.Journal;
@@ -35,6 +36,8 @@
import org.hornetq.jms.persistence.JMSStorageManager;
import org.hornetq.jms.persistence.PersistedConnectionFactory;
import org.hornetq.jms.persistence.PersistedDestination;
+import org.hornetq.jms.persistence.PersistedJNDI;
+import org.hornetq.jms.persistence.PersistedType;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.utils.IDGenerator;
@@ -54,6 +57,8 @@
private final byte DESTINATION_RECORD = 2;
+ private final byte JNDI_RECORD = 3;
+
// Attributes ----------------------------------------------------
private final IDGenerator idGenerator;
@@ -68,7 +73,9 @@
private Map<String, PersistedConnectionFactory> mapFactories = new ConcurrentHashMap<String, PersistedConnectionFactory>();
- private Map<String, PersistedDestination> destinations = new ConcurrentHashMap<String, PersistedDestination>();
+ private Map<Pair<PersistedType, String>, PersistedDestination> destinations = new ConcurrentHashMap<Pair<PersistedType, String>, PersistedDestination>();
+
+ private Map<Pair<PersistedType, String>, PersistedJNDI> mapJNDI = new ConcurrentHashMap<Pair<PersistedType, String>, PersistedJNDI>();
// Static --------------------------------------------------------
@@ -162,20 +169,106 @@
*/
public void storeDestination(final PersistedDestination destination) throws Exception
{
- deleteDestination(destination.getName());
+ deleteDestination(destination.getType(), destination.getName());
long id = idGenerator.generateID();
destination.setId(id);
jmsJournal.appendAddRecord(id, DESTINATION_RECORD, destination, true);
- destinations.put(destination.getName(), destination);
+ destinations.put(new Pair<PersistedType, String>(destination.getType(), destination.getName()), destination);
}
+
+ public List<PersistedJNDI> recoverPersistedJNDI() throws Exception
+ {
+ ArrayList<PersistedJNDI> list = new ArrayList<PersistedJNDI>();
+
+ list.addAll(mapJNDI.values());
+
+ return list;
+ }
+
+ public void addJNDI(PersistedType type, String name, String address) throws Exception
+ {
+ Pair<PersistedType, String> key = new Pair<PersistedType, String>(type, name);
- public void deleteDestination(final String name) throws Exception
+ long tx = idGenerator.generateID();
+
+ PersistedJNDI currentJNDI = mapJNDI.get(key);
+ if (currentJNDI != null)
+ {
+ jmsJournal.appendDeleteRecordTransactional(tx, currentJNDI.getId());
+ }
+ else
+ {
+ currentJNDI = new PersistedJNDI(type, name);
+ }
+
+ currentJNDI.addJNDI(address);
+
+ long newId = idGenerator.generateID();
+
+ currentJNDI.setId(newId);
+
+ jmsJournal.appendAddRecordTransactional(tx, newId, JNDI_RECORD, currentJNDI);
+
+ jmsJournal.appendCommitRecord(tx, true);
+ }
+
+ public void deleteJNDI(PersistedType type, String name, String address) throws Exception
{
- PersistedDestination destination = destinations.get(name);
+ Pair<PersistedType, String> key = new Pair<PersistedType, String>(type, name);
+
+ long tx = idGenerator.generateID();
+
+ PersistedJNDI currentJNDI = mapJNDI.get(key);
+ if (currentJNDI == null)
+ {
+ return;
+ }
+ else
+ {
+ jmsJournal.appendDeleteRecordTransactional(tx, currentJNDI.getId());
+ }
+
+ currentJNDI.deleteJNDI(address);
+
+ if (currentJNDI.getJndi().size() == 0)
+ {
+ mapJNDI.remove(key);
+ }
+ else
+ {
+ long newId = idGenerator.generateID();
+ currentJNDI.setId(newId);
+ jmsJournal.appendAddRecordTransactional(tx, newId, JNDI_RECORD, currentJNDI);
+ }
+
+ jmsJournal.appendCommitRecord(tx, true);
+ }
+
+
+ public void deleteJNDI(PersistedType type, String name) throws Exception
+ {
+ Pair<PersistedType, String> key = new Pair<PersistedType, String>(type, name);
+
+ PersistedJNDI currentJNDI = mapJNDI.remove(key);
+
+ if (currentJNDI == null)
+ {
+ return;
+ }
+ else
+ {
+ jmsJournal.appendDeleteRecord(currentJNDI.getId(), true);
+ }
+ }
+
+ public void deleteDestination(final PersistedType type, final String name) throws Exception
+ {
+ PersistedDestination destination = destinations.get(new Pair<PersistedType, String>(type, name));
if(destination != null)
{
jmsJournal.appendDeleteRecord(destination.getId(), false);
}
+ deleteJNDI(type, name);
}
/* (non-Javadoc)
@@ -247,8 +340,16 @@
PersistedDestination destination = new PersistedDestination();
destination.decode(buffer);
destination.setId(id);
- destinations.put(destination.getName(), destination);
+ destinations.put(new Pair<PersistedType, String>(destination.getType(), destination.getName()), destination);
}
+ else if (rec == JNDI_RECORD)
+ {
+ PersistedJNDI jndi = new PersistedJNDI();
+ jndi.decode(buffer);
+ jndi.setId(id);
+ Pair<PersistedType, String> key = new Pair<PersistedType, String>(jndi.getType(), jndi.getName());
+ mapJNDI.put(key, jndi);
+ }
else
{
throw new IllegalStateException("Invalid record type " + rec);
@@ -278,6 +379,7 @@
}
}
+
// Inner classes -------------------------------------------------
}
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -19,6 +19,8 @@
import org.hornetq.jms.persistence.JMSStorageManager;
import org.hornetq.jms.persistence.PersistedConnectionFactory;
import org.hornetq.jms.persistence.PersistedDestination;
+import org.hornetq.jms.persistence.PersistedJNDI;
+import org.hornetq.jms.persistence.PersistedType;
/**
* A NullJMSStorageManagerImpl
@@ -98,6 +100,47 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#addJNDI(org.hornetq.jms.persistence.PersistedType, java.lang.String, java.lang.String)
+ */
+ public void addJNDI(PersistedType type, String name, String address) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#deleteJNDI(org.hornetq.jms.persistence.PersistedType, java.lang.String, java.lang.String)
+ */
+ public void deleteJNDI(PersistedType type, String name, String address) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#deleteDestination(org.hornetq.jms.persistence.PersistedType, java.lang.String)
+ */
+ public void deleteDestination(PersistedType type, String name) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#deleteJNDI(org.hornetq.jms.persistence.PersistedType, java.lang.String)
+ */
+ public void deleteJNDI(PersistedType type, String name) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#recoverPersistedJNDI()
+ */
+ public List<PersistedJNDI> recoverPersistedJNDI() throws Exception
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -49,8 +49,6 @@
*
* @param queueName
* The name of the queue to create
- * @param jndiBinding
- * the name of the binding for JNDI
* @param selectorString
* @param durable
* @return true if the queue is created or if it existed and was added to
@@ -58,7 +56,9 @@
* @throws Exception
* if problems were encountered creating the queue.
*/
- boolean createQueue(String queueName, String jndiBinding, String selectorString, boolean durable) throws Exception;
+ boolean createQueue(String queueName, String selectorString, boolean durable, String ...jndi) throws Exception;
+
+ boolean addQueueToJndi(final String queueName, final String jndiBinding) throws Exception;
/**
* Creates a JMS Topic
@@ -72,10 +72,10 @@
* @throws Exception
* if a problem occurred creating the topic
*/
- boolean createTopic(String topicName, String jndiBinding) throws Exception;
+ boolean createTopic(String topicName, String ... jndi) throws Exception;
/**
- * Remove the destination from JNDI.
+ * Remove the topic from JNDI.
* Calling this method does <em>not</em> destroy the destination.
*
* @param name
@@ -84,9 +84,45 @@
* @throws Exception
* if a problem occurred removing the destination
*/
- boolean undeployDestination(String name) throws Exception;
+ boolean removeTopicFromJNDI(String name, String jndi) throws Exception;
/**
+ * Remove the topic from JNDI.
+ * Calling this method does <em>not</em> destroy the destination.
+ *
+ * @param name
+ * the name of the destination to remove from JNDI
+ * @return true if removed
+ * @throws Exception
+ * if a problem occurred removing the destination
+ */
+ boolean removeTopicFromJNDI(String name) throws Exception;
+
+ /**
+ * Remove the queue from JNDI.
+ * Calling this method does <em>not</em> destroy the destination.
+ *
+ * @param name
+ * the name of the destination to remove from JNDI
+ * @return true if removed
+ * @throws Exception
+ * if a problem occurred removing the destination
+ */
+ boolean removeQueueFromJNDI(String name, String jndi) throws Exception;
+
+ /**
+ * Remove the queue from JNDI.
+ * Calling this method does <em>not</em> destroy the destination.
+ *
+ * @param name
+ * the name of the destination to remove from JNDI
+ * @return true if removed
+ * @throws Exception
+ * if a problem occurred removing the destination
+ */
+ boolean removeQueueFromJNDI(String name) throws Exception;
+
+ /**
* destroys a queue and removes it from JNDI
*
* @param name
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -13,8 +13,6 @@
package org.hornetq.jms.server.impl;
-import java.util.ArrayList;
-
import org.hornetq.core.config.Configuration;
import org.hornetq.core.deployers.DeploymentManager;
import org.hornetq.core.deployers.impl.XmlDeployer;
@@ -39,7 +37,7 @@
private final JMSServerConfigParser parser;
- private final JMSServerManager jmsServerControl;
+ private final JMSServerManager jmsServerManager;
protected static final String CONNECTOR_REF_ELEMENT = "connector-ref";
@@ -67,7 +65,7 @@
{
super(deploymentManager);
- jmsServerControl = jmsServerManager;
+ this.jmsServerManager = jmsServerManager;
configuration = config;
@@ -139,17 +137,17 @@
if (node.getNodeName().equals(JMSServerDeployer.CONNECTION_FACTORY_NODE_NAME))
{
String cfName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
- jmsServerControl.destroyConnectionFactory(cfName);
+ jmsServerManager.destroyConnectionFactory(cfName);
}
else if (node.getNodeName().equals(JMSServerDeployer.QUEUE_NODE_NAME))
{
String queueName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
- jmsServerControl.undeployDestination(queueName);
+ jmsServerManager.removeQueueFromJNDI(queueName);
}
else if (node.getNodeName().equals(JMSServerDeployer.TOPIC_NODE_NAME))
{
String topicName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
- jmsServerControl.undeployDestination(topicName);
+ jmsServerManager.removeTopicFromJNDI(topicName);
}
}
@@ -171,7 +169,7 @@
TopicConfiguration topicConfig = parser.parseTopicConfiguration(node);
for (String jndi : topicConfig.getBindings())
{
- jmsServerControl.createTopic(topicConfig.getName(), jndi);
+ jmsServerManager.createTopic(topicConfig.getName(), jndi);
}
}
@@ -182,10 +180,7 @@
private void deployQueue(final Node node) throws Exception
{
JMSQueueConfiguration queueconfig = parser.parseQueueConfiguration(node);
- for (String jndiName : queueconfig.getBindings())
- {
- jmsServerControl.createQueue(queueconfig.getName(), jndiName, queueconfig.getSelector(), queueconfig.isDurable());
- }
+ jmsServerManager.createQueue(queueconfig.getName(), queueconfig.getSelector(), queueconfig.isDurable(), queueconfig.getBindings());
}
/**
@@ -195,7 +190,7 @@
private void deployConnectionFactory(final Node node) throws Exception
{
ConnectionFactoryConfiguration cfConfig = parser.parseConnectionFactoryConfiguration(node);
- jmsServerControl.createConnectionFactory(cfConfig);
+ jmsServerManager.createConnectionFactory(cfConfig);
}
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -45,8 +45,8 @@
import org.hornetq.jms.persistence.JMSStorageManager;
import org.hornetq.jms.persistence.PersistedConnectionFactory;
import org.hornetq.jms.persistence.PersistedDestination;
-import org.hornetq.jms.persistence.impl.DestinationType;
-import org.hornetq.jms.persistence.impl.PersistedJNDIBinding;
+import org.hornetq.jms.persistence.PersistedJNDI;
+import org.hornetq.jms.persistence.PersistedType;
import org.hornetq.jms.persistence.impl.journal.JournalJMSStorageManagerImpl;
import org.hornetq.jms.persistence.impl.nullpm.NullJMSStorageManagerImpl;
import org.hornetq.jms.server.JMSServerManager;
@@ -85,14 +85,18 @@
*/
private Context context;
- private Map<String, HornetQDestination> destinations = new HashMap<String, HornetQDestination>();
+ private Map<String, HornetQDestination> queues = new HashMap<String, HornetQDestination>();
- private final Map<String, List<String>> destinationBindings = new HashMap<String, List<String>>();
+ private Map<String, HornetQDestination> topics = new HashMap<String, HornetQDestination>();
private final Map<String, HornetQConnectionFactory> connectionFactories = new HashMap<String, HornetQConnectionFactory>();
- private final Map<String, List<String>> connectionFactoryBindings = new HashMap<String, List<String>>();
+ private final Map<String, List<String>> queueJNDI = new HashMap<String, List<String>>();
+ private final Map<String, List<String>> topicJNDI = new HashMap<String, List<String>>();
+
+ private final Map<String, List<String>> connectionFactoryJNDI = new HashMap<String, List<String>>();
+
private final HornetQServer server;
private JMSManagementService jmsManagementService;
@@ -110,15 +114,15 @@
private boolean contextSet;
private JMSConfiguration config;
-
+
private Configuration coreConfig;
-
+
private JMSStorageManager storage;
public JMSServerManagerImpl(final HornetQServer server) throws Exception
{
this.server = server;
-
+
configFileName = null;
}
@@ -144,7 +148,7 @@
{
active = true;
- jmsManagementService = new JMSManagementServiceImpl(server.getManagementService());
+ jmsManagementService = new JMSManagementServiceImpl(server.getManagementService(), this);
try
{
@@ -171,7 +175,7 @@
{
deploy();
}
-
+
initJournal();
}
catch (Exception e)
@@ -220,19 +224,19 @@
deploymentManager.stop();
}
- for (String destination : destinationBindings.keySet())
- {
- undeployDestination(destination);
- }
+ // for (String destination : destinationBindings.keySet())
+ // {
+ // undeployDestination(destination);
+ // }
for (String connectionFactory : new HashSet<String>(connectionFactories.keySet()))
{
destroyConnectionFactory(connectionFactory);
}
- destinationBindings.clear();
+ // destinationBindings.clear();
connectionFactories.clear();
- connectionFactoryBindings.clear();
+ connectionFactoryJNDI.clear();
if (context != null)
{
@@ -294,112 +298,173 @@
return server.getHornetQServerControl().getVersion();
}
- public synchronized boolean createQueue(final String queueName,
- final String jndiBinding,
- final String selectorString,
- final boolean durable) throws Exception
+ public synchronized boolean createQueue(final String queueName, final String selectorString, final boolean durable, final String ... jndi) throws Exception
{
checkInitialised();
- boolean added = internalCreateQueue(queueName, jndiBinding, selectorString, durable);
+ boolean added = internalCreateQueue(queueName, selectorString, durable);
- storage.storeDestination(new PersistedDestination(DestinationType.QUEUE, queueName, jndiBinding, selectorString, durable));
+ storage.storeDestination(new PersistedDestination(PersistedType.Queue, queueName, selectorString, durable));
+
+ for (String jndiItem : jndi)
+ {
+ addQueueToJndi(queueName, jndiItem);
+ }
return added;
}
- public synchronized boolean createTopic(final String topicName, final String jndiBinding) throws Exception
+ public synchronized boolean createTopic(final String topicName, final String ... jndi) throws Exception
{
checkInitialised();
- boolean added = internalCreateTopic(topicName, jndiBinding);
+ boolean added = internalCreateTopic(topicName);
- storage.storeDestination(new PersistedDestination(DestinationType.TOPIC, topicName, jndiBinding));
+ storage.storeDestination(new PersistedDestination(PersistedType.Topic, topicName));
+
+ for (String jndiItem : jndi)
+ {
+ addQueueToJndi(topicName, jndiItem);
+ }
return added;
}
- public boolean addTopicToJndi(final String topicName, final String jndiBinding) throws NamingException
+ public boolean addTopicToJndi(final String topicName, final String jndiBinding) throws Exception
{
- HornetQDestination destination = destinations.get(topicName);
- if(destination == null)
+ checkInitialised();
+
+ HornetQDestination destination = topics.get(topicName);
+ if (destination == null)
{
throw new IllegalArgumentException("Topic does not exist");
}
- if(destination.getTopicName() == null)
+ if (destination.getTopicName() == null)
{
throw new IllegalArgumentException(topicName + " is not a topic");
}
boolean added = bindToJndi(jndiBinding, destination);
if (added)
{
- addToDestinationBindings(topicName, jndiBinding);
- storage.storeJndiBinding(new PersistedJNDIBinding(DestinationType.TOPIC, topicName, jndiBinding));
+ addToBindings(topicJNDI, topicName, jndiBinding);
+ storage.addJNDI(PersistedType.Topic, topicName, jndiBinding);
}
return added;
}
- public boolean addQueueToJndi(final String queueName, final String jndiBinding) throws NamingException
+ public boolean addQueueToJndi(final String queueName, final String jndiBinding) throws Exception
{
- HornetQDestination destination = destinations.get(queueName);
- if(destination == null)
+ checkInitialised();
+
+ HornetQDestination destination = queues.get(queueName);
+ if (destination == null)
{
throw new IllegalArgumentException("Queue does not exist");
}
- if(destination.getQueueName() == null)
+ if (destination.getQueueName() == null)
{
throw new IllegalArgumentException(queueName + " is not a queue");
}
boolean added = bindToJndi(jndiBinding, destination);
if (added)
{
- addToDestinationBindings(queueName, jndiBinding);
- storage.storeJndiBinding(new PersistedJNDIBinding(DestinationType.QUEUE, queueName, jndiBinding));
+ addToBindings(queueJNDI, queueName, jndiBinding);
+ storage.addJNDI(PersistedType.Queue, queueName, jndiBinding);
}
return added;
}
- public synchronized boolean undeployDestination(final String name) throws Exception
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.server.JMSServerManager#removeQueueFromJNDI(java.lang.String, java.lang.String)
+ */
+ public boolean removeQueueFromJNDI(String name, String jndi) throws Exception
{
checkInitialised();
- List<String> jndiBindings = destinationBindings.get(name);
- if (jndiBindings == null || jndiBindings.size() == 0)
- {
- return false;
- }
- if (context != null)
- {
- Iterator<String> iter = jndiBindings.iterator();
- while (iter.hasNext())
- {
- String jndiBinding = iter.next();
- context.unbind(jndiBinding);
- iter.remove();
- }
- }
+
+ removeFromJNDI(queueJNDI, name, jndi);
+
+ storage.deleteJNDI(PersistedType.Queue, name, jndi);
+
return true;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.server.JMSServerManager#removeQueueFromJNDI(java.lang.String, java.lang.String)
+ */
+ public boolean removeQueueFromJNDI(String name) throws Exception
+ {
+ checkInitialised();
+
+ removeFromJNDI(queueJNDI, name);
+
+ storage.deleteJNDI(PersistedType.Queue, name);
+
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.server.JMSServerManager#removeTopicFromJNDI(java.lang.String, java.lang.String)
+ */
+ public boolean removeTopicFromJNDI(String name, String jndi) throws Exception
+ {
+ checkInitialised();
+
+ removeFromJNDI(topicJNDI, name, jndi);
+
+ storage.deleteJNDI(PersistedType.Topic, name, jndi);
+
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.server.JMSServerManager#removeTopicFromJNDI(java.lang.String, java.lang.String)
+ */
+ public boolean removeTopicFromJNDI(String name) throws Exception
+ {
+ checkInitialised();
+
+ removeFromJNDI(topicJNDI, name);
+
+ storage.deleteJNDI(PersistedType.Topic, name);
+
+ return true;
+ }
+
+
public synchronized boolean destroyQueue(final String name) throws Exception
{
checkInitialised();
- undeployDestination(name);
+
+ removeFromJNDI(queueJNDI, name);
+
- destinationBindings.remove(name);
+ queues.remove(name);
+ queueJNDI.remove(name);
+
jmsManagementService.unregisterQueue(name);
+
server.getHornetQServerControl().destroyQueue(HornetQDestination.createQueueAddressFromName(name).toString());
- storage.deleteDestination(name);
+
+
+ storage.deleteDestination(PersistedType.Queue, name);
+
return true;
}
public synchronized boolean destroyTopic(final String name) throws Exception
{
checkInitialised();
- undeployDestination(name);
+
+ removeFromJNDI(topicJNDI, name);
- destinationBindings.remove(name);
+ topics.remove(name);
+ topicJNDI.remove(name);
+
jmsManagementService.unregisterTopic(name);
- AddressControl addressControl = (AddressControl)server.getManagementService().getResource(ResourceNames.CORE_ADDRESS + HornetQDestination.createTopicAddressFromName(name));
+
+ AddressControl addressControl = (AddressControl)server.getManagementService()
+ .getResource(ResourceNames.CORE_ADDRESS + HornetQDestination.createTopicAddressFromName(name));
if (addressControl != null)
{
for (String queueName : addressControl.getQueueNames())
@@ -407,10 +472,13 @@
Binding binding = server.getPostOffice().getBinding(new SimpleString(queueName));
if (binding == null)
{
- log.warn("Queue " + queueName + " doesn't exist on the topic " + name + ". It was deleted manually probably.");
+ log.warn("Queue " + queueName +
+ " doesn't exist on the topic " +
+ name +
+ ". It was deleted manually probably.");
continue;
}
-
+
// We can't remove the remote binding. As this would be the bridge associated with the topic on this case
if (binding.getType() != BindingType.REMOTE_QUEUE)
{
@@ -418,7 +486,7 @@
}
}
}
- storage.deleteDestination(name);
+ storage.deleteDestination(PersistedType.Topic, name);
return true;
}
@@ -634,12 +702,9 @@
storage.storeConnectionFactory(new PersistedConnectionFactory(cfConfig));
}
- private boolean internalCreateQueue(final String queueName,
- final String jndiBinding,
- final String selectorString,
- final boolean durable) throws Exception
+ private boolean internalCreateQueue(final String queueName, final String selectorString, final boolean durable) throws Exception
{
- HornetQDestination jBossQueue = HornetQDestination.createQueue(queueName);
+ HornetQDestination hqQueue = HornetQDestination.createQueue(queueName);
// Convert from JMS selector to core filter
String coreFilterString = null;
@@ -649,42 +714,42 @@
coreFilterString = SelectorTranslator.convertToHornetQFilterString(selectorString);
}
- server.getHornetQServerControl().deployQueue(jBossQueue.getAddress(),
- jBossQueue.getAddress(),
+ server.getHornetQServerControl().deployQueue(hqQueue.getAddress(),
+ hqQueue.getAddress(),
coreFilterString,
durable);
+
+ queues.put(queueName, hqQueue);
- boolean added = bindToJndi(jndiBinding, jBossQueue);
+ jmsManagementService.registerQueue(hqQueue);
- if (added)
- {
- addToDestinationBindings(queueName, jndiBinding);
- }
-
- jmsManagementService.registerQueue(jBossQueue, jndiBinding);
-
- return added;
+ return true;
}
- private boolean internalCreateTopic(final String topicName, final String jndiBinding) throws Exception
+ /**
+ * Performs the internal creation without activating any storage.
+ * The storage load will call this method
+ * @param topicName
+ * @return
+ * @throws Exception
+ */
+ private boolean internalCreateTopic(final String topicName) throws Exception
{
- HornetQDestination jBossTopic = HornetQDestination.createTopic(topicName);
+ HornetQDestination hqTopic = HornetQDestination.createTopic(topicName);
// We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS
// checks when routing messages to a topic that
// does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
// subscriptions - core has no notion of a topic
- server.getHornetQServerControl().deployQueue(jBossTopic.getAddress(),
- jBossTopic.getAddress(),
+ server.getHornetQServerControl().deployQueue(hqTopic.getAddress(),
+ hqTopic.getAddress(),
JMSServerManagerImpl.REJECT_FILTER,
true);
- boolean added = bindToJndi(jndiBinding, jBossTopic);
- if (added)
- {
- addToDestinationBindings(topicName, jndiBinding);
- }
- jmsManagementService.registerTopic(jBossTopic, jndiBinding);
- return added;
+ topics.put(topicName, hqTopic);
+
+ jmsManagementService.registerTopic(hqTopic);
+
+ return true;
}
/**
@@ -842,7 +907,7 @@
public synchronized boolean destroyConnectionFactory(final String name) throws Exception
{
checkInitialised();
- List<String> jndiBindings = connectionFactoryBindings.get(name);
+ List<String> jndiBindings = connectionFactoryJNDI.get(name);
if (jndiBindings == null || jndiBindings.size() == 0)
{
return false;
@@ -861,7 +926,7 @@
}
}
}
- connectionFactoryBindings.remove(name);
+ connectionFactoryJNDI.remove(name);
connectionFactories.remove(name);
jmsManagementService.unregisterConnectionFactory(name);
@@ -897,6 +962,7 @@
checkInitialised();
return server.getHornetQServerControl().listSessions(connectionID);
}
+
// Public --------------------------------------------------------
// Private -------------------------------------------------------
@@ -917,15 +983,22 @@
{
bindToJndi(jndiBinding, cf);
- if (connectionFactoryBindings.get(name) == null)
- {
- connectionFactoryBindings.put(name, new ArrayList<String>());
- }
- connectionFactoryBindings.get(name).add(jndiBinding);
+ addToBindings(connectionFactoryJNDI, name, jndiBinding);
}
- jmsManagementService.registerConnectionFactory(name, cf, jndiBindings);
+ jmsManagementService.registerConnectionFactory(name, cf);
}
+
+ private void addToBindings(Map<String, List<String>> map, String name, String jndi)
+ {
+ List<String> list = map.get(name);
+ if (list == null)
+ {
+ list = new ArrayList<String>();
+ map.put(name, list);
+ }
+ list.add(jndi);
+ }
private boolean bindToJndi(final String jndiName, final Object objectToBind) throws NamingException
{
@@ -992,15 +1065,6 @@
}
}
- private void addToDestinationBindings(final String destination, final String jndiBinding)
- {
- if (destinationBindings.get(destination) == null)
- {
- destinationBindings.put(destination, new ArrayList<String>());
- }
- destinationBindings.get(destination).add(jndiBinding);
- }
-
private void deploy() throws Exception
{
if (config == null)
@@ -1023,9 +1087,10 @@
for (JMSQueueConfiguration config : queueConfigs)
{
String[] bindings = config.getBindings();
+ createQueue(config.getName(), config.getSelector(), config.isDurable());
for (String binding : bindings)
{
- createQueue(config.getName(), binding, config.getSelector(), config.isDurable());
+ addQueueToJndi(config.getName(), binding);
}
}
@@ -1033,9 +1098,11 @@
for (TopicConfiguration config : topicConfigs)
{
String[] bindings = config.getBindings();
+ createTopic(config.getName());
+
for (String binding : bindings)
{
- createTopic(config.getName(), binding);
+ addTopicToJndi(config.getName(), binding);
}
}
}
@@ -1046,7 +1113,7 @@
private void initJournal() throws Exception
{
this.coreConfig = server.getConfiguration();
-
+
if (coreConfig.isPersistenceEnabled())
{
// TODO: replication
@@ -1056,12 +1123,11 @@
{
storage = new NullJMSStorageManagerImpl();
}
-
+
storage.start();
-
-
+
List<PersistedConnectionFactory> cfs = storage.recoverConnectionFactories();
-
+
for (PersistedConnectionFactory cf : cfs)
{
internalCreateCF(cf.getConfig());
@@ -1071,18 +1137,104 @@
for (PersistedDestination destination : destinations)
{
- if(destination.getType() == DestinationType.QUEUE)
+ if (destination.getType() == PersistedType.Queue)
{
- internalCreateQueue(destination.getName(), destination.getJndiBinding(), destination.getSelector(), destination.isDurable());
+ internalCreateQueue(destination.getName(),
+ destination.getSelector(),
+ destination.isDurable());
}
- else if(destination.getType() == DestinationType.TOPIC)
+ else if (destination.getType() == PersistedType.Topic)
{
- internalCreateTopic(destination.getName(), destination.getJndiBinding());
+ internalCreateTopic(destination.getName());
}
}
+
+ List<PersistedJNDI> jndiSpace = storage.recoverPersistedJNDI();
+ for (PersistedJNDI record : jndiSpace)
+ {
+ Map<String, List<String>> mapJNDI;
+ Map<String, ?> objects;
+
+ switch (record.getType())
+ {
+ case Queue:
+ mapJNDI = queueJNDI;
+ objects = queues;
+ break;
+ case Topic:
+ mapJNDI = topicJNDI;
+ objects = topics;
+ break;
+ default:
+ case ConnectionFactory:
+ mapJNDI = connectionFactoryJNDI;
+ objects = connectionFactories;
+ break;
+ }
+
+ Object objectToBind = objects.get(record.getName());
+
+ if (objectToBind == null)
+ {
+ continue;
+ }
+
+ List<String> jndiList = mapJNDI.get(record.getName());
+ if (jndiList == null)
+ {
+ jndiList = new ArrayList<String>();
+ mapJNDI.put(record.getName(), jndiList);
+ }
+
+
+ for (String jndi : record.getJndi())
+ {
+ jndiList.add(jndi);
+ bindToJndi(jndi, objectToBind);
+ }
+ }
}
-
-
+
+ private synchronized boolean removeFromJNDI(final Map<String, List<String>> jndiMap, final String name) throws Exception
+ {
+ checkInitialised();
+ List<String> jndiBindings = jndiMap.get(name);
+ if (jndiBindings == null || jndiBindings.size() == 0)
+ {
+ return false;
+ }
+ if (context != null)
+ {
+ Iterator<String> iter = jndiBindings.iterator();
+ while (iter.hasNext())
+ {
+ String jndiBinding = iter.next();
+ context.unbind(jndiBinding);
+ iter.remove();
+ }
+ }
+ return true;
+ }
+
+
+ private synchronized boolean removeFromJNDI(final Map<String, List<String>> jndiMap, final String name, final String jndi) throws Exception
+ {
+ checkInitialised();
+ List<String> jndiBindings = jndiMap.get(name);
+ if (jndiBindings == null || jndiBindings.size() == 0)
+ {
+ return false;
+ }
+ if (context != null)
+ {
+ if (jndiBindings.remove(jndi))
+ {
+ context.unbind(jndi);
+ }
+ }
+ return true;
+ }
+
/**
* @param cfConfig
* @return
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/management/JMSManagementService.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/management/JMSManagementService.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/management/JMSManagementService.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -13,8 +13,6 @@
package org.hornetq.jms.server.management;
-import java.util.List;
-
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
@@ -32,15 +30,15 @@
void unregisterJMSServer() throws Exception;
- void registerQueue(HornetQDestination queue, String jndiBinding) throws Exception;
+ void registerQueue(HornetQDestination queue) throws Exception;
void unregisterQueue(String name) throws Exception;
- void registerTopic(HornetQDestination topic, String jndiBinding) throws Exception;
+ void registerTopic(HornetQDestination topic) throws Exception;
void unregisterTopic(String name) throws Exception;
- void registerConnectionFactory(String name, HornetQConnectionFactory connectionFactory, List<String> bindings) throws Exception;
+ void registerConnectionFactory(String name, HornetQConnectionFactory connectionFactory) throws Exception;
void unregisterConnectionFactory(String name) throws Exception;
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -49,12 +49,15 @@
// Attributes ----------------------------------------------------
private final ManagementService managementService;
+
+ private final JMSServerManager jmsServerManager;
// Static --------------------------------------------------------
- public JMSManagementServiceImpl(final ManagementService managementService)
+ public JMSManagementServiceImpl(final ManagementService managementService, final JMSServerManager jmsServerManager)
{
this.managementService = managementService;
+ this.jmsServerManager = jmsServerManager;
}
// Public --------------------------------------------------------
@@ -77,7 +80,7 @@
managementService.unregisterFromRegistry(ResourceNames.JMS_SERVER);
}
- public synchronized void registerQueue(final HornetQDestination queue, final String jndiBinding) throws Exception
+ public synchronized void registerQueue(final HornetQDestination queue) throws Exception
{
QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue.getAddress());
MessageCounterManager messageCounterManager = managementService.getMessageCounterManager();
@@ -89,7 +92,7 @@
messageCounterManager.getMaxDayCount());
messageCounterManager.registerMessageCounter(queue.getName(), counter);
ObjectName objectName = managementService.getObjectNameBuilder().getJMSQueueObjectName(queue.getQueueName());
- JMSQueueControlImpl control = new JMSQueueControlImpl(queue, coreQueueControl, jndiBinding, counter);
+ JMSQueueControlImpl control = new JMSQueueControlImpl(queue, coreQueueControl, jmsServerManager, counter);
managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_QUEUE + queue.getQueueName(), control);
}
@@ -101,11 +104,11 @@
managementService.unregisterFromRegistry(ResourceNames.JMS_QUEUE + name);
}
- public synchronized void registerTopic(final HornetQDestination topic, final String jndiBinding) throws Exception
+ public synchronized void registerTopic(final HornetQDestination topic) throws Exception
{
ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(topic.getTopicName());
AddressControl addressControl = (AddressControl)managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress());
- JMSTopicControlImpl control = new JMSTopicControlImpl(topic, addressControl, jndiBinding, managementService);
+ JMSTopicControlImpl control = new JMSTopicControlImpl(topic, addressControl, managementService);
managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_TOPIC + topic.getTopicName(), control);
}
@@ -118,11 +121,10 @@
}
public synchronized void registerConnectionFactory(final String name,
- final HornetQConnectionFactory connectionFactory,
- final List<String> bindings) throws Exception
+ final HornetQConnectionFactory connectionFactory) throws Exception
{
ObjectName objectName = managementService.getObjectNameBuilder().getConnectionFactoryObjectName(name);
- JMSConnectionFactoryControlImpl control = new JMSConnectionFactoryControlImpl(connectionFactory, name, bindings);
+ JMSConnectionFactoryControlImpl control = new JMSConnectionFactoryControlImpl(connectionFactory, name);
managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_CONNECTION_FACTORY + name, control);
}
Modified: branches/Clebert_TMP/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- branches/Clebert_TMP/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -210,7 +210,7 @@
public void createQueue(final String name, final String jndiName) throws Exception
{
- getJMSServerManager().createQueue(name, "/queue/" + (jndiName != null ? jndiName : name), null, true);
+ getJMSServerManager().createQueue(name, null, true, "/queue/" + (jndiName != null ? jndiName : name));
}
public void createTopic(final String name, final String jndiName) throws Exception
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -150,7 +150,7 @@
{
server = jmsServer1;
}
- server.createQueue(queueName, "/queue/" + queueName, null, true);
+ server.createQueue(queueName, null, true, "/queue/" + queueName);
}
@Override
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -199,7 +199,7 @@
JMSBridgeReconnectionTest.log.info("Restarting server");
jmsServer1.start();
- jmsServer1.createQueue("targetQueue", "queue/targetQueue", null, true);
+ jmsServer1.createQueue("targetQueue", null, true, "queue/targetQueue");
createQueue("targetQueue", 1);
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/connection/ExceptionListenerTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/connection/ExceptionListenerTest.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/connection/ExceptionListenerTest.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -69,7 +69,7 @@
jmsServer = new JMSServerManagerImpl(server);
jmsServer.setContext(new NullInitialContext());
jmsServer.start();
- jmsServer.createQueue(ExceptionListenerTest.Q_NAME, ExceptionListenerTest.Q_NAME, null, true);
+ jmsServer.createQueue(ExceptionListenerTest.Q_NAME, null, true, ExceptionListenerTest.Q_NAME);
cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
cf.setBlockOnDurableSend(true);
cf.setPreAcknowledge(true);
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -49,7 +49,7 @@
{
super.setUp();
- jmsServer.createQueue(ConsumerTest.Q_NAME, ConsumerTest.Q_NAME, null, true);
+ jmsServer.createQueue(ConsumerTest.Q_NAME, null, true, ConsumerTest.Q_NAME);
cf = HornetQJMSClient.createConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
}
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -417,7 +417,7 @@
JMSQueueControl queueControl = createManagementControl();
String expiryQueueName = RandomUtil.randomString();
HornetQDestination expiryQueue = (HornetQDestination)HornetQJMSClient.createQueue(expiryQueueName);
- serverManager.createQueue(expiryQueueName, expiryQueueName, null, true);
+ serverManager.createQueue(expiryQueueName, null, true, expiryQueueName);
queueControl.setExpiryAddress(expiryQueue.getAddress());
JMSQueueControl expiryQueueControl = ManagementControlHelper.createJMSQueueControl(expiryQueue, mbeanServer);
@@ -544,7 +544,7 @@
public void testSendMessageToDeadLetterAddress() throws Exception
{
String deadLetterQueue = RandomUtil.randomString();
- serverManager.createQueue(deadLetterQueue, deadLetterQueue, null, true);
+ serverManager.createQueue(deadLetterQueue, null, true, deadLetterQueue);
HornetQDestination dlq = (HornetQDestination)HornetQJMSClient.createQueue(deadLetterQueue);
Connection conn = createConnection();
@@ -605,7 +605,7 @@
String filter = "key = " + matchingValue;
String deadLetterQueue = RandomUtil.randomString();
- serverManager.createQueue(deadLetterQueue, deadLetterQueue, null, true);
+ serverManager.createQueue(deadLetterQueue, null, true, deadLetterQueue);
HornetQDestination dlq = (HornetQDestination)HornetQJMSClient.createQueue(deadLetterQueue);
Connection conn = createConnection();
@@ -647,7 +647,7 @@
{
String otherQueueName = RandomUtil.randomString();
- serverManager.createQueue(otherQueueName, otherQueueName, null, true);
+ serverManager.createQueue(otherQueueName, null, true, otherQueueName);
HornetQDestination otherQueue = (HornetQDestination)HornetQJMSClient.createQueue(otherQueueName);
// send on queue
@@ -694,7 +694,7 @@
String filter = "key = " + matchingValue;
String otherQueueName = RandomUtil.randomString();
- serverManager.createQueue(otherQueueName, otherQueueName, null, true);
+ serverManager.createQueue(otherQueueName, null, true, otherQueueName);
HornetQDestination otherQueue = (HornetQDestination)HornetQJMSClient.createQueue(otherQueueName);
Connection connection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
@@ -729,7 +729,7 @@
{
String otherQueueName = RandomUtil.randomString();
- serverManager.createQueue(otherQueueName, otherQueueName, null, true);
+ serverManager.createQueue(otherQueueName, null, true, otherQueueName);
HornetQDestination otherQueue = (HornetQDestination)HornetQJMSClient.createQueue(otherQueueName);
String[] messageIDs = JMSUtil.sendMessages(queue, 1);
@@ -752,7 +752,7 @@
String unknownMessageID = RandomUtil.randomString();
String otherQueueName = RandomUtil.randomString();
- serverManager.createQueue(otherQueueName, otherQueueName, null, true);
+ serverManager.createQueue(otherQueueName, null, true, otherQueueName);
JMSQueueControl queueControl = createManagementControl();
Assert.assertEquals(0, queueControl.getMessageCount());
@@ -834,7 +834,7 @@
serverManager.activated();
String queueName = RandomUtil.randomString();
- serverManager.createQueue(queueName, queueName, null, true);
+ serverManager.createQueue(queueName, null, true, queueName);
queue = (HornetQDestination)HornetQJMSClient.createQueue(queueName);
}
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -270,6 +270,12 @@
{
return (String)proxy.retrieveAttributeValue("selector");
}
+
+ public void addJNDI(String jndi) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
};
}
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -115,7 +115,7 @@
checkNoResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
JMSServerControl control = createManagementControl();
- control.createQueue(queueName, queueJNDIBinding);
+ control.createQueue(queueName);
Object o = UnitTestCase.checkBinding(context, queueJNDIBinding);
Assert.assertTrue(o instanceof Queue);
@@ -134,7 +134,7 @@
checkNoResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
JMSServerControl control = createManagementControl();
- control.createQueue(queueName, queueJNDIBinding);
+ control.createQueue(queueName);
UnitTestCase.checkBinding(context, queueJNDIBinding);
checkResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
@@ -153,7 +153,7 @@
JMSServerControl control = createManagementControl();
Assert.assertEquals(0, control.getQueueNames().length);
- control.createQueue(queueName, queueJNDIBinding);
+ control.createQueue(queueName);
String[] names = control.getQueueNames();
Assert.assertEquals(1, names.length);
@@ -173,7 +173,7 @@
checkNoResource(ObjectNameBuilder.DEFAULT.getJMSTopicObjectName(topicName));
JMSServerControl control = createManagementControl();
- control.createTopic(topicName, topicJNDIBinding);
+ control.createTopic(topicName);
Object o = UnitTestCase.checkBinding(context, topicJNDIBinding);
Assert.assertTrue(o instanceof Topic);
@@ -191,7 +191,7 @@
checkNoResource(ObjectNameBuilder.DEFAULT.getJMSTopicObjectName(topicName));
JMSServerControl control = createManagementControl();
- control.createTopic(topicName, topicJNDIBinding);
+ control.createTopic(topicName);
checkResource(ObjectNameBuilder.DEFAULT.getJMSTopicObjectName(topicName));
Topic topic = (Topic)context.lookup(topicJNDIBinding);
@@ -224,7 +224,7 @@
JMSServerControl control = createManagementControl();
Assert.assertEquals(0, control.getTopicNames().length);
- control.createTopic(topicName, topicJNDIBinding);
+ control.createTopic(topicName);
String[] names = control.getTopicNames();
Assert.assertEquals(1, names.length);
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -168,14 +168,14 @@
return (Boolean)proxy.invokeOperation("closeConnectionsForAddress", ipAddress);
}
- public boolean createQueue(final String name, final String jndiBinding) throws Exception
+ public boolean createQueue(final String name) throws Exception
{
- return (Boolean)proxy.invokeOperation("createQueue", name, jndiBinding);
+ return (Boolean)proxy.invokeOperation("createQueue", name);
}
- public boolean createTopic(final String name, final String jndiBinding) throws Exception
+ public boolean createTopic(final String name) throws Exception
{
- return (Boolean)proxy.invokeOperation("createTopic", name, jndiBinding);
+ return (Boolean)proxy.invokeOperation("createTopic", name);
}
public void destroyConnectionFactory(final String name) throws Exception
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -76,7 +76,6 @@
Assert.assertEquals(topic.getTopicName(), topicControl.getName());
Assert.assertEquals(topic.getAddress(), topicControl.getAddress());
Assert.assertEquals(topic.isTemporary(), topicControl.isTemporary());
- Assert.assertEquals(topic.getName(), topicControl.getJNDIBinding());
}
public void testGetXXXSubscriptionsCount() throws Exception
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -533,8 +533,8 @@
jmsServer.setContext(context);
jmsServer.start();
- jmsServer.createQueue(JMSBridgeImplTest.SOURCE, "/queue/" + JMSBridgeImplTest.SOURCE, null, true);
- jmsServer.createQueue(JMSBridgeImplTest.TARGET, "/queue/" + JMSBridgeImplTest.TARGET, null, true);
+ jmsServer.createQueue(JMSBridgeImplTest.SOURCE, null, true, "/queue/" + JMSBridgeImplTest.SOURCE);
+ jmsServer.createQueue(JMSBridgeImplTest.TARGET, null, true, "/queue/" + JMSBridgeImplTest.TARGET);
}
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -90,8 +90,8 @@
*/
protected Queue createQueue(final String name) throws Exception, NamingException
{
- jmsServer2.createQueue(name, "/queue/" + name, null, true);
- jmsServer1.createQueue(name, "/queue/" + name, null, true);
+ jmsServer2.createQueue(name, null, true, "/queue/" + name);
+ jmsServer1.createQueue(name, null, true, "/queue/" + name);
return (Queue)context1.lookup("/queue/" + name);
}
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-03-25 18:57:34 UTC (rev 8966)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-03-26 05:08:39 UTC (rev 8967)
@@ -85,7 +85,7 @@
*/
protected Queue createQueue(final String name) throws Exception, NamingException
{
- jmsServer.createQueue(name, "/jms/" + name, null, true);
+ jmsServer.createQueue(name, null, true, "/jms/" + name);
return (Queue)context.lookup("/jms/" + name);
}
14 years, 2 months
JBoss hornetq SVN: r8966 - in branches/Clebert_TMP/src/main/org/hornetq/jms: persistence/impl and 1 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-03-25 14:57:34 -0400 (Thu, 25 Mar 2010)
New Revision: 8966
Added:
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/DestinationType.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/PersistedJNDIBinding.java
Modified:
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
Log:
jndi persistence type
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java 2010-03-25 15:04:26 UTC (rev 8965)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java 2010-03-25 18:57:34 UTC (rev 8966)
@@ -16,6 +16,7 @@
import java.util.List;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.jms.persistence.impl.PersistedJNDIBinding;
/**
* A JMSPersistence
@@ -57,4 +58,5 @@
// Inner classes -------------------------------------------------
+ void storeJndiBinding(PersistedJNDIBinding persistedJNDIBinding);
}
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java 2010-03-25 15:04:26 UTC (rev 8965)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java 2010-03-25 18:57:34 UTC (rev 8966)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.jms.persistence.impl.DestinationType;
import org.hornetq.utils.BufferHelper;
import org.hornetq.utils.DataConstants;
@@ -29,26 +30,11 @@
// Constants -----------------------------------------------------
- public enum Type
- {
- QUEUE,
- TOPIC;
-
- public int getType()
- {
- return this == QUEUE ? 1 : 2;
- }
-
- static Type getType(int type)
- {
- return type == 1 ? QUEUE : TOPIC;
- }
- }
// Attributes ----------------------------------------------------
private long id;
- private Type type;
+ private DestinationType type;
private String name;
@@ -65,12 +51,12 @@
{
}
- public PersistedDestination(final Type type, final String name, final String jndiBinding)
+ public PersistedDestination(final DestinationType type, final String name, final String jndiBinding)
{
this(type, name, jndiBinding, null, false);
}
- public PersistedDestination(final Type type, final String name, final String jndiBinding, final String selector, final boolean durable)
+ public PersistedDestination(final DestinationType type, final String name, final String jndiBinding, final String selector, final boolean durable)
{
this.type = type;
this.name = name;
@@ -109,7 +95,7 @@
return jndiBinding;
}
- public Type getType()
+ public DestinationType getType()
{
return type;
}
@@ -144,7 +130,7 @@
public void decode(final HornetQBuffer buffer)
{
- type = Type.getType(buffer.readInt());
+ type = DestinationType.getType(buffer.readInt());
name = buffer.readString();
jndiBinding = buffer.readString();
selector = buffer.readNullableString();
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/DestinationType.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/DestinationType.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/DestinationType.java 2010-03-25 18:57:34 UTC (rev 8966)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.persistence.impl;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Mar 25, 2010
+ */
+public enum DestinationType
+{
+ QUEUE,
+ TOPIC;
+
+ public int getType()
+ {
+ return this == QUEUE ? 1 : 2;
+ }
+
+ public static DestinationType getType(int type)
+ {
+ return type == 1 ? QUEUE : TOPIC;
+ }
+}
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/PersistedJNDIBinding.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/PersistedJNDIBinding.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/PersistedJNDIBinding.java 2010-03-25 18:57:34 UTC (rev 8966)
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.persistence.impl;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.utils.BufferHelper;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Mar 25, 2010
+ */
+public class PersistedJNDIBinding implements EncodingSupport
+{
+ private long id;
+
+ private DestinationType type;
+
+ private String name;
+
+ private String jndiBinding;
+
+ public PersistedJNDIBinding()
+ {
+ }
+
+ public PersistedJNDIBinding(final DestinationType type, final String name, final String jndiBinding)
+ {
+ this.type = type;
+ this.name = name;
+ this.jndiBinding = jndiBinding;
+ }
+
+ public long getId()
+ {
+ return id;
+ }
+
+ public void setId(final long id)
+ {
+ this.id = id;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public String getJndiBinding()
+ {
+ return jndiBinding;
+ }
+
+ public DestinationType getType()
+ {
+ return type;
+ }
+
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_INT +
+ BufferHelper.sizeOfSimpleString(name) +
+ BufferHelper.sizeOfSimpleString(jndiBinding);
+ }
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeInt(type.getType());
+ buffer.writeString(name);
+ buffer.writeString(jndiBinding);
+ }
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ type = DestinationType.getType(buffer.readInt());
+ name = buffer.readString();
+ jndiBinding = buffer.readString();
+ }
+}
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-25 15:04:26 UTC (rev 8965)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-25 18:57:34 UTC (rev 8966)
@@ -45,6 +45,8 @@
import org.hornetq.jms.persistence.JMSStorageManager;
import org.hornetq.jms.persistence.PersistedConnectionFactory;
import org.hornetq.jms.persistence.PersistedDestination;
+import org.hornetq.jms.persistence.impl.DestinationType;
+import org.hornetq.jms.persistence.impl.PersistedJNDIBinding;
import org.hornetq.jms.persistence.impl.journal.JournalJMSStorageManagerImpl;
import org.hornetq.jms.persistence.impl.nullpm.NullJMSStorageManagerImpl;
import org.hornetq.jms.server.JMSServerManager;
@@ -56,9 +58,6 @@
import org.hornetq.jms.server.management.impl.JMSManagementServiceImpl;
import org.hornetq.utils.TimeAndCounterIDGenerator;
-import static org.hornetq.jms.persistence.PersistedDestination.Type.QUEUE;
-import static org.hornetq.jms.persistence.PersistedDestination.Type.TOPIC;
-
/**
* A Deployer used to create and add to JNDI queues, topics and connection
* factories. Typically this would only be used in an app server env.
@@ -86,8 +85,10 @@
*/
private Context context;
- private final Map<String, List<String>> destinations = new HashMap<String, List<String>>();
+ private Map<String, HornetQDestination> destinations = new HashMap<String, HornetQDestination>();
+ private final Map<String, List<String>> destinationBindings = new HashMap<String, List<String>>();
+
private final Map<String, HornetQConnectionFactory> connectionFactories = new HashMap<String, HornetQConnectionFactory>();
private final Map<String, List<String>> connectionFactoryBindings = new HashMap<String, List<String>>();
@@ -219,7 +220,7 @@
deploymentManager.stop();
}
- for (String destination : destinations.keySet())
+ for (String destination : destinationBindings.keySet())
{
undeployDestination(destination);
}
@@ -229,7 +230,7 @@
destroyConnectionFactory(connectionFactory);
}
- destinations.clear();
+ destinationBindings.clear();
connectionFactories.clear();
connectionFactoryBindings.clear();
@@ -302,7 +303,7 @@
boolean added = internalCreateQueue(queueName, jndiBinding, selectorString, durable);
- storage.storeDestination(new PersistedDestination(QUEUE, queueName, jndiBinding, selectorString, durable));
+ storage.storeDestination(new PersistedDestination(DestinationType.QUEUE, queueName, jndiBinding, selectorString, durable));
return added;
}
@@ -313,15 +314,55 @@
boolean added = internalCreateTopic(topicName, jndiBinding);
- storage.storeDestination(new PersistedDestination(TOPIC, topicName, jndiBinding));
+ storage.storeDestination(new PersistedDestination(DestinationType.TOPIC, topicName, jndiBinding));
return added;
}
+ public boolean addTopicToJndi(final String topicName, final String jndiBinding) throws NamingException
+ {
+ HornetQDestination destination = destinations.get(topicName);
+ if(destination == null)
+ {
+ throw new IllegalArgumentException("Topic does not exist");
+ }
+ if(destination.getTopicName() == null)
+ {
+ throw new IllegalArgumentException(topicName + " is not a topic");
+ }
+ boolean added = bindToJndi(jndiBinding, destination);
+ if (added)
+ {
+ addToDestinationBindings(topicName, jndiBinding);
+ storage.storeJndiBinding(new PersistedJNDIBinding(DestinationType.TOPIC, topicName, jndiBinding));
+ }
+ return added;
+ }
+
+ public boolean addQueueToJndi(final String queueName, final String jndiBinding) throws NamingException
+ {
+ HornetQDestination destination = destinations.get(queueName);
+ if(destination == null)
+ {
+ throw new IllegalArgumentException("Queue does not exist");
+ }
+ if(destination.getQueueName() == null)
+ {
+ throw new IllegalArgumentException(queueName + " is not a queue");
+ }
+ boolean added = bindToJndi(jndiBinding, destination);
+ if (added)
+ {
+ addToDestinationBindings(queueName, jndiBinding);
+ storage.storeJndiBinding(new PersistedJNDIBinding(DestinationType.QUEUE, queueName, jndiBinding));
+ }
+ return added;
+ }
+
public synchronized boolean undeployDestination(final String name) throws Exception
{
checkInitialised();
- List<String> jndiBindings = destinations.get(name);
+ List<String> jndiBindings = destinationBindings.get(name);
if (jndiBindings == null || jndiBindings.size() == 0)
{
return false;
@@ -344,7 +385,7 @@
checkInitialised();
undeployDestination(name);
- destinations.remove(name);
+ destinationBindings.remove(name);
jmsManagementService.unregisterQueue(name);
server.getHornetQServerControl().destroyQueue(HornetQDestination.createQueueAddressFromName(name).toString());
storage.deleteDestination(name);
@@ -356,7 +397,7 @@
checkInitialised();
undeployDestination(name);
- destinations.remove(name);
+ destinationBindings.remove(name);
jmsManagementService.unregisterTopic(name);
AddressControl addressControl = (AddressControl)server.getManagementService().getResource(ResourceNames.CORE_ADDRESS + HornetQDestination.createTopicAddressFromName(name));
if (addressControl != null)
@@ -953,11 +994,11 @@
private void addToDestinationBindings(final String destination, final String jndiBinding)
{
- if (destinations.get(destination) == null)
+ if (destinationBindings.get(destination) == null)
{
- destinations.put(destination, new ArrayList<String>());
+ destinationBindings.put(destination, new ArrayList<String>());
}
- destinations.get(destination).add(jndiBinding);
+ destinationBindings.get(destination).add(jndiBinding);
}
private void deploy() throws Exception
@@ -1030,11 +1071,11 @@
for (PersistedDestination destination : destinations)
{
- if(destination.getType() == QUEUE)
+ if(destination.getType() == DestinationType.QUEUE)
{
internalCreateQueue(destination.getName(), destination.getJndiBinding(), destination.getSelector(), destination.isDurable());
}
- else if(destination.getType() == TOPIC)
+ else if(destination.getType() == DestinationType.TOPIC)
{
internalCreateTopic(destination.getName(), destination.getJndiBinding());
}
14 years, 2 months
JBoss hornetq SVN: r8965 - in branches/Clebert_TMP/src/main/org/hornetq/jms: persistence/impl/journal and 2 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-03-25 11:04:26 -0400 (Thu, 25 Mar 2010)
New Revision: 8965
Modified:
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
Log:
added JMS destination persistence
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java 2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java 2010-03-25 15:04:26 UTC (rev 8965)
@@ -37,7 +37,9 @@
// Public --------------------------------------------------------
- void storeDestination(PersistedDestination destination);
+ void storeDestination(PersistedDestination destination) throws Exception;
+
+ void deleteDestination(String name) throws Exception;
List<PersistedDestination> recoverDestinations();
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java 2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java 2010-03-25 15:04:26 UTC (rev 8965)
@@ -13,24 +13,71 @@
package org.hornetq.jms.persistence;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.utils.BufferHelper;
+import org.hornetq.utils.DataConstants;
+
/**
* A PersistedDestination
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
*/
-public class PersistedDestination
+public class PersistedDestination implements EncodingSupport
{
+
// Constants -----------------------------------------------------
+ public enum Type
+ {
+ QUEUE,
+ TOPIC;
+
+ public int getType()
+ {
+ return this == QUEUE ? 1 : 2;
+ }
+
+ static Type getType(int type)
+ {
+ return type == 1 ? QUEUE : TOPIC;
+ }
+ }
// Attributes ----------------------------------------------------
+ private long id;
+
+ private Type type;
+
+ private String name;
+
+ private String jndiBinding;
+
+ private String selector;
+
+ private boolean durable;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
+ public PersistedDestination()
+ {
+ }
+
+ public PersistedDestination(final Type type, final String name, final String jndiBinding)
+ {
+ this(type, name, jndiBinding, null, false);
+ }
+
+ public PersistedDestination(final Type type, final String name, final String jndiBinding, final String selector, final boolean durable)
+ {
+ this.type = type;
+ this.name = name;
+ this.jndiBinding = jndiBinding;
+ this.selector = selector;
+ this.durable = durable;
+ }
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
@@ -41,4 +88,66 @@
// Inner classes -------------------------------------------------
+
+ public long getId()
+ {
+ return id;
+ }
+
+ public void setId(final long id)
+ {
+ this.id = id;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public String getJndiBinding()
+ {
+ return jndiBinding;
+ }
+
+ public Type getType()
+ {
+ return type;
+ }
+
+ public String getSelector()
+ {
+ return selector;
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_INT +
+ BufferHelper.sizeOfSimpleString(name) +
+ BufferHelper.sizeOfSimpleString(jndiBinding) +
+ BufferHelper.sizeOfNullableSimpleString(selector) +
+ DataConstants.SIZE_BOOLEAN;
+ }
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeInt(type.getType());
+ buffer.writeString(name);
+ buffer.writeString(jndiBinding);
+ buffer.writeNullableString(selector);
+ buffer.writeBoolean(durable);
+ }
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ type = Type.getType(buffer.readInt());
+ name = buffer.readString();
+ jndiBinding = buffer.readString();
+ selector = buffer.readNullableString();
+ durable = buffer.readBoolean();
+ }
}
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java 2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java 2010-03-25 15:04:26 UTC (rev 8965)
@@ -51,6 +51,8 @@
// Constants -----------------------------------------------------
private final byte CF_RECORD = 1;
+
+ private final byte DESTINATION_RECORD = 2;
// Attributes ----------------------------------------------------
@@ -66,6 +68,8 @@
private Map<String, PersistedConnectionFactory> mapFactories = new ConcurrentHashMap<String, PersistedConnectionFactory>();
+ private Map<String, PersistedDestination> destinations = new ConcurrentHashMap<String, PersistedDestination>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -125,7 +129,7 @@
/* (non-Javadoc)
* @see org.hornetq.jms.persistence.JMSStorageManager#storeConnectionFactory(org.hornetq.jms.persistence.PersistedConnectionFactory)
*/
- public void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws Exception
+ public void storeConnectionFactory(final PersistedConnectionFactory connectionFactory) throws Exception
{
deleteConnectionFactory(connectionFactory.getName());
long id = idGenerator.generateID();
@@ -134,7 +138,7 @@
mapFactories.put(connectionFactory.getName(), connectionFactory);
}
- public void deleteConnectionFactory(String cfName) throws Exception
+ public void deleteConnectionFactory(final String cfName) throws Exception
{
PersistedConnectionFactory oldCF = mapFactories.remove(cfName);
if (oldCF != null)
@@ -148,18 +152,31 @@
*/
public List<PersistedDestination> recoverDestinations()
{
- return null;
+ List<PersistedDestination> destinations = new ArrayList<PersistedDestination>(this.destinations.size());
+ destinations.addAll(this.destinations.values());
+ return destinations;
}
/* (non-Javadoc)
* @see org.hornetq.jms.persistence.JMSStorageManager#storeDestination(org.hornetq.jms.persistence.PersistedDestination)
*/
- public void storeDestination(PersistedDestination destination)
+ public void storeDestination(final PersistedDestination destination) throws Exception
{
- // TODO Auto-generated method stub
-
+ deleteDestination(destination.getName());
+ long id = idGenerator.generateID();
+ destination.setId(id);
+ jmsJournal.appendAddRecord(id, DESTINATION_RECORD, destination, true);
+ destinations.put(destination.getName(), destination);
}
+ public void deleteDestination(final String name) throws Exception
+ {
+ PersistedDestination destination = destinations.get(name);
+ if(destination != null)
+ {
+ jmsJournal.appendDeleteRecord(destination.getId(), false);
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.server.HornetQComponent#isStarted()
@@ -225,6 +242,13 @@
cf.setId(id);
mapFactories.put(cf.getName(), cf);
}
+ else if(rec == DESTINATION_RECORD)
+ {
+ PersistedDestination destination = new PersistedDestination();
+ destination.decode(buffer);
+ destination.setId(id);
+ destinations.put(destination.getName(), destination);
+ }
else
{
throw new IllegalStateException("Invalid record type " + rec);
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java 2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java 2010-03-25 15:04:26 UTC (rev 8965)
@@ -68,6 +68,14 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#deleteDestination (org.hornetq.jms.persistence.PersistedDestination)
+ */
+ public void deleteDestination(String name) throws Exception
+ {
+
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.server.HornetQComponent#isStarted()
*/
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-25 14:14:40 UTC (rev 8964)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-25 15:04:26 UTC (rev 8965)
@@ -44,6 +44,7 @@
import org.hornetq.jms.client.SelectorTranslator;
import org.hornetq.jms.persistence.JMSStorageManager;
import org.hornetq.jms.persistence.PersistedConnectionFactory;
+import org.hornetq.jms.persistence.PersistedDestination;
import org.hornetq.jms.persistence.impl.journal.JournalJMSStorageManagerImpl;
import org.hornetq.jms.persistence.impl.nullpm.NullJMSStorageManagerImpl;
import org.hornetq.jms.server.JMSServerManager;
@@ -55,6 +56,9 @@
import org.hornetq.jms.server.management.impl.JMSManagementServiceImpl;
import org.hornetq.utils.TimeAndCounterIDGenerator;
+import static org.hornetq.jms.persistence.PersistedDestination.Type.QUEUE;
+import static org.hornetq.jms.persistence.PersistedDestination.Type.TOPIC;
+
/**
* A Deployer used to create and add to JNDI queues, topics and connection
* factories. Typically this would only be used in an app server env.
@@ -295,50 +299,22 @@
final boolean durable) throws Exception
{
checkInitialised();
- HornetQDestination jBossQueue = HornetQDestination.createQueue(queueName);
- // Convert from JMS selector to core filter
- String coreFilterString = null;
+ boolean added = internalCreateQueue(queueName, jndiBinding, selectorString, durable);
- if (selectorString != null)
- {
- coreFilterString = SelectorTranslator.convertToHornetQFilterString(selectorString);
- }
+ storage.storeDestination(new PersistedDestination(QUEUE, queueName, jndiBinding, selectorString, durable));
- server.getHornetQServerControl().deployQueue(jBossQueue.getAddress(),
- jBossQueue.getAddress(),
- coreFilterString,
- durable);
-
- boolean added = bindToJndi(jndiBinding, jBossQueue);
-
- if (added)
- {
- addToDestinationBindings(queueName, jndiBinding);
- }
-
- jmsManagementService.registerQueue(jBossQueue, jndiBinding);
return added;
}
public synchronized boolean createTopic(final String topicName, final String jndiBinding) throws Exception
{
checkInitialised();
- HornetQDestination jBossTopic = HornetQDestination.createTopic(topicName);
- // We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS
- // checks when routing messages to a topic that
- // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
- // subscriptions - core has no notion of a topic
- server.getHornetQServerControl().deployQueue(jBossTopic.getAddress(),
- jBossTopic.getAddress(),
- JMSServerManagerImpl.REJECT_FILTER,
- true);
- boolean added = bindToJndi(jndiBinding, jBossTopic);
- if (added)
- {
- addToDestinationBindings(topicName, jndiBinding);
- }
- jmsManagementService.registerTopic(jBossTopic, jndiBinding);
+
+ boolean added = internalCreateTopic(topicName, jndiBinding);
+
+ storage.storeDestination(new PersistedDestination(TOPIC, topicName, jndiBinding));
+
return added;
}
@@ -371,7 +347,7 @@
destinations.remove(name);
jmsManagementService.unregisterQueue(name);
server.getHornetQServerControl().destroyQueue(HornetQDestination.createQueueAddressFromName(name).toString());
-
+ storage.deleteDestination(name);
return true;
}
@@ -401,6 +377,7 @@
}
}
}
+ storage.deleteDestination(name);
return true;
}
@@ -616,6 +593,59 @@
storage.storeConnectionFactory(new PersistedConnectionFactory(cfConfig));
}
+ private boolean internalCreateQueue(final String queueName,
+ final String jndiBinding,
+ final String selectorString,
+ final boolean durable) throws Exception
+ {
+ HornetQDestination jBossQueue = HornetQDestination.createQueue(queueName);
+
+ // Convert from JMS selector to core filter
+ String coreFilterString = null;
+
+ if (selectorString != null)
+ {
+ coreFilterString = SelectorTranslator.convertToHornetQFilterString(selectorString);
+ }
+
+ server.getHornetQServerControl().deployQueue(jBossQueue.getAddress(),
+ jBossQueue.getAddress(),
+ coreFilterString,
+ durable);
+
+ boolean added = bindToJndi(jndiBinding, jBossQueue);
+
+ if (added)
+ {
+ addToDestinationBindings(queueName, jndiBinding);
+ }
+
+ jmsManagementService.registerQueue(jBossQueue, jndiBinding);
+
+ return added;
+ }
+
+ private boolean internalCreateTopic(final String topicName, final String jndiBinding) throws Exception
+ {
+ HornetQDestination jBossTopic = HornetQDestination.createTopic(topicName);
+ // We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS
+ // checks when routing messages to a topic that
+ // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
+ // subscriptions - core has no notion of a topic
+ server.getHornetQServerControl().deployQueue(jBossTopic.getAddress(),
+ jBossTopic.getAddress(),
+ JMSServerManagerImpl.REJECT_FILTER,
+ true);
+ boolean added = bindToJndi(jndiBinding, jBossTopic);
+ if (added)
+ {
+ addToDestinationBindings(topicName, jndiBinding);
+ }
+ jmsManagementService.registerTopic(jBossTopic, jndiBinding);
+
+ return added;
+ }
+
/**
* @param cfConfig
* @throws HornetQException
@@ -995,6 +1025,20 @@
{
internalCreateCF(cf.getConfig());
}
+
+ List<PersistedDestination> destinations = storage.recoverDestinations();
+
+ for (PersistedDestination destination : destinations)
+ {
+ if(destination.getType() == QUEUE)
+ {
+ internalCreateQueue(destination.getName(), destination.getJndiBinding(), destination.getSelector(), destination.isDurable());
+ }
+ else if(destination.getType() == TOPIC)
+ {
+ internalCreateTopic(destination.getName(), destination.getJndiBinding());
+ }
+ }
}
14 years, 2 months
JBoss hornetq SVN: r8964 - branches/HnetQ_323_cn/docs/user-manual/zh.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-03-25 10:14:40 -0400 (Thu, 25 Mar 2010)
New Revision: 8964
Modified:
branches/HnetQ_323_cn/docs/user-manual/zh/client-classpath.xml
branches/HnetQ_323_cn/docs/user-manual/zh/examples.xml
branches/HnetQ_323_cn/docs/user-manual/zh/jms-core-mapping.xml
Log:
work
Modified: branches/HnetQ_323_cn/docs/user-manual/zh/client-classpath.xml
===================================================================
--- branches/HnetQ_323_cn/docs/user-manual/zh/client-classpath.xml 2010-03-25 10:23:18 UTC (rev 8963)
+++ branches/HnetQ_323_cn/docs/user-manual/zh/client-classpath.xml 2010-03-25 14:14:40 UTC (rev 8964)
@@ -17,36 +17,31 @@
<!-- permitted by applicable law. -->
<!-- ============================================================================= -->
<chapter id="client-classpath">
- <title>The Client Classpath</title>
- <para>HornetQ requires several jars on the <emphasis>Client Classpath</emphasis> depending on
- whether the client uses HornetQ Core API, JMS, and JNDI.</para>
+ <title>客户端的Classpath</title>
+ <para>HornetQ的<emphasis>客户端Classpath</emphasis>需要有几个jar文件。具体是哪几个要根据客户端
+ 是需要内核API、JMS和JNDI中的哪些服务来确定。</para>
<warning>
- <para>All the jars mentioned here can be found in the <literal>lib</literal> directory of
- the HornetQ distribution. Be sure you only use the jars from the correct version of the
- release, you <emphasis>must not</emphasis> mix and match versions of jars from different
- HornetQ versions. Mixing and matching different jar versions may cause subtle errors and
- failures to occur.</para>
+ <para>本章所提及的所有jar文件全部在HorneQ发布包的 <literal>lib</literal>目录下。在使用中一定
+ 要确保所有的jar文件属于同一个发布版本。将不同版本的jar混在一起使用可能造成一些难以发现的错误。</para>
</warning>
<section>
- <title>HornetQ Core Client</title>
- <para>If you are using just a pure HornetQ Core client (i.e. no JMS) then you need <literal
- >hornetq-core-client.jar</literal>, <literal>hornetq-transports.jar</literal> and
- <literal>netty.jar</literal> on your client classpath.</para>
+ <title>使用HornetQ内核的客户端</title>
+ <para>如果客户端只使用HornetQ内核(非JMS客户端),需要将 <literal
+ >hornetq-core-client.jar</literal>、 <literal>hornetq-transports.jar</literal> 和
+ <literal>netty.jar</literal> 放到classpath中。</para>
</section>
<section>
- <title>JMS Client</title>
- <para>If you are using JMS on the client side, then you will also need to include <literal
- >hornetq-jms-client.jar</literal> and <literal>jboss-jms-api.jar</literal>.</para>
+ <title>JMS客户端</title>
+ <para>如果客户端使用JMS,需要在classpath上增加两个jar文件: <literal
+ >hornetq-jms-client.jar</literal> 和 <literal>jboss-jms-api.jar</literal>。</para>
<note>
- <para><literal>jboss-jms-api.jar</literal> just contains Java EE API interface classes
- needed for the <literal>javax.jms.*</literal> classes. If you already have a jar
- with these interface classes on your classpath, you will not need it.</para>
+ <para><literal>jboss-jms-api.jar</literal>中包含的只是 <literal>javax.jms.*</literal> 包中的接口类。
+ 如果这些类已经在你的classpath中,则你就不需要这个jar文件。</para>
</note>
</section>
<section>
- <title>JMS Client with JNDI</title>
- <para>If you are looking up JMS resources from the JNDI server co-located with the HornetQ
- standalone server, you wil also need the jar <literal>jnp-client.jar</literal> jar on
- your client classpath as well as any other jars mentioned previously.</para>
+ <title>需要JNDI的JMS客户端</title>
+ <para>如果你的JMS客户端使用JNDI来查找HornetQ单独服务器上的对象,你需要将 <literal>jnp-client.jar</literal> 增加
+ 到你的classpath中。</para>
</section>
</chapter>
Modified: branches/HnetQ_323_cn/docs/user-manual/zh/examples.xml
===================================================================
--- branches/HnetQ_323_cn/docs/user-manual/zh/examples.xml 2010-03-25 10:23:18 UTC (rev 8963)
+++ branches/HnetQ_323_cn/docs/user-manual/zh/examples.xml 2010-03-25 14:14:40 UTC (rev 8964)
@@ -17,98 +17,73 @@
<!-- permitted by applicable law. -->
<!-- ============================================================================= -->
<chapter id="examples">
- <title>Examples</title>
- <para>The HornetQ distribution comes with over 70 run out-of-the-box examples demonstrating many
- of the features.</para>
- <para>The examples are available in the distribution, in the <literal>examples</literal>
- directory. Examples are split into JMS and core examples. JMS examples show how a particular
- feature can be used by a normal JMS client. Core examples show how the equivalent feature
- can be used by a core messaging client.</para>
- <para>A set of Java EE examples are also provided which need the JBoss Application Server
- installed to be able to run.</para>
+ <title>例子</title>
+ <para>在HornetQ的发布包中有超过70个不同的例子。这些例子直接可以运行。它们分别展示了HornetQ所具有的各种功能。</para>
+ <para>所有的例子都在HornetQ发布包的 <literal>examples</literal>目录下。所有的例子被分成了两大类:
+ JMS例子和内核例子。JMS例子展现的是JMS的各种功能,内核的例子则展示的是内核API的功能。
+</para>
+ <para>此外HornetQ还提供了一些Java EE的例子,这些例子需要JBoss应用服务器才能运行。</para>
<section>
- <title>JMS Examples</title>
- <para>To run a JMS example, simply <literal>cd</literal> into the appropriate example
- directory and type <literal>./build.sh</literal> (or <literal>build.bat</literal> if you
- are on Windows).</para>
- <para>Here's a listing of the examples with a brief description.</para>
+ <title>JMS 例子</title>
+ <para>要运行一个JMS例子,只要进入到相应例子的子目录,运行 <literal>./build.sh</literal> (或者
+ 在Windows平台上运行<literal>build.bat</literal>)即可。</para>
+ <para>下面列出的这些JMS例子并配有简要的说明。</para>
<section id="application-level-failover">
- <title>Application-Layer Failover</title>
- <para>HornetQ also supports Application-Layer failover, useful in the case that
- replication is not enabled on the server side.</para>
- <para>With Application-Layer failover, it's up to the application to register a JMS
- <literal>ExceptionListener</literal> with HornetQ which will be called by
- HornetQ in the event that connection failure is detected.</para>
- <para>The code in the <literal>ExceptionListener</literal> then recreates the JMS
- connection, session, etc on another node and the application can continue.</para>
- <para>Application-layer failover is an alternative approach to High Availability (HA).
- Application-layer failover differs from automatic failover in that some client side
- coding is required in order to implement this. Also, with Application-layer
- failover, since the old session object dies and a new one is created, any
- uncommitted work in the old session will be lost, and any unacknowledged messages
- might be redelivered.</para>
+ <title>应用层的失效备援(Failover)</title>
+ <para>HornetQ支持应用层的失效备援。这在服务器端没有复制(replication)配置的情况下是很有用的。</para>
+ <para>应用程序可以注册一个JMS <literal>ExceptionListener</literal>。当HornetQ检测到连接故障时,它会
+ 通知这个注册的Listener。</para>
+ <para>这个<literal>ExceptionListener</literal>在接到HornetQ的通知后可以与其它的节点创建
+ 新的连接、会话等对象,以使应用程序能继续运行。</para>
+ <para>应用层的失效备援是实现高可获得性(HA)的一种方法。它与自动失效备援不同之处在于它需要编写额外的代码。
+ 同时由于发生故障时旧的会话结束,这会造成那些还没来得及提交的工作丢失,还会造成任何没有通知的消息被重发。</para>
</section>
<section id="examples.bridge">
- <title>Core Bridge Example</title>
- <para>The <literal>bridge</literal> example demonstrates a core bridge deployed on one
- server, which consumes messages from a local queue and forwards them to an address
- on a second server.</para>
- <para>Core bridges are used to create message flows between any two HornetQ servers
- which are remotely separated. Core bridges are resilient and will cope with
- temporary connection failure allowing them to be an ideal choice for forwarding over
- unreliable connections, e.g. a WAN.</para>
+ <title>内核桥(Bridge)例子</title>
+ <para><literal>bridge</literal>例子展示的是将一个内核桥部署到一个服务器上,从本地的queue接收消息并将其转发到
+ 另一服务器的地址上。</para>
+ <para>内核的bridge可用来在两个互相分开的HornetQ的服务器间建立一个消息流。它可以处理临时性的连接故障,特别适用于
+ 不可靠的网络的情况。广域网就是一个例子。</para>
</section>
<section id="examples.browsers">
- <title>Browser</title>
- <para>The <literal>browser</literal> example shows you how to use a JMS <literal
- >QueueBrowser</literal> with HornetQ.</para>
- <para>Queues are a standard part of JMS, please consult the JMS 1.1 specification for
- full details.</para>
- <para> A <literal>QueueBrowser</literal> is used to look at messages on the queue
- without removing them. It can scan the entire content of a queue or only messages
- matching a message selector.</para>
+ <title>浏览器(Browser)</title>
+ <para><literal>browser</literal>例子展示的是在HornetQ中如何使用JMS <literal
+ >QueueBrowser</literal>。</para>
+ <para>有关JMS queue的概念在JMS 1.1 specification有明确的定义,这里就不再叙述。</para>
+ <para> 一个<literal>QueueBrowser</literal>可以用来观察queue中的消息而影响它们。它可以观察queue中的全部
+ 消息,也可以定义一个选择器(selector)来选择性地察看消息。</para>
</section>
<section>
<title>Client Kickoff</title>
- <para>The <literal>client-kickoff</literal> example shows how to terminate client
- connections given an IP address using the JMX management API.</para>
+ <para><literal>client-kickoff</literal>例子展示的是如何利用JMX管理接口通过已知的IP地址来断开客户端的连接。</para>
</section>
<section>
- <title>Client-Side Load-Balancing</title>
- <para>The <literal>client-side-load-balancing</literal> example demonstrates how
- sessions created from a single JMS <literal>Connection</literal> can
- be created to different nodes of the cluster. In other words it demonstrates how
- HornetQ does client-side load-balancing of sessions across the cluster.</para>
+ <title>客户端的负载均衡</title>
+ <para><literal>client-side-load-balancing</literal>例子展示的是通过一个JMS连接可以在集群的不同节点上创建
+ 会话。也就是说HornetQ可以对客户端的会话创建进行集群内的负载均衡。</para>
</section>
<section id="examples.clustered.grouping">
- <title>Clustered Grouping</title>
- <para>This is similar to the message grouping example except that it demonstrates it
- working over a cluster. Messages sent to different nodes with the same group id will
- be sent to the same node and the same consumer.</para>
+ <title>集群分组</title>
+ <para>与分组(grouping)例子相似,只是本例所展示的是集群的情况。发向不同节点的具有相同组id的消息
+ 都会传送到同一个节点上的同一个接收者(consumer)。</para>
</section>
<section>
- <title>Clustered Queue</title>
- <para>The <literal>clustered-queue</literal> example demonstrates a JMS queue deployed
- on two different nodes. The two nodes are configured to form a cluster. We then
- create a consumer for the queue on each node, and we create a producer on only one
- of the nodes. We then send some messages via the producer, and we verify that both
- consumers receive the sent messages in a round-robin fashion.</para>
+ <title>集群队列</title>
+ <para><literal>clustered-queue</literal> 例子将一个JMS queue部署到两个节点上。这两个节点组成一个集群。
+ 我们在每个节点上各创建一个接收者(consumer),但只在其中一个节点上创建一个发送者(producer)。利用发送者
+ 发送一些消息,然后确认两个接收者以轮换方式(round-robin)接收这些消息。</para>
</section>
<section>
- <title>Clustered Standalone</title>
- <para>The <literal>clustered-standalone</literal> example demonstrates how to configure
- and starts 3 cluster nodes on the same machine to form a cluster. A subscriber for a
- JMS topic is created on each node, and we create a producer on only one of the
- nodes. We then send some messages via the producer, and we verify that the 3
- subscribers receive all the sent messages.</para>
+ <title>单机集群</title>
+ <para><literal>clustered-standalone</literal>例子所展示的是如何在同一台机器上配置并运行
+ 3个节点的集群。在每个节点上都创建了一个JMS topic的订阅者(subscriber)。只在其中一个节点上
+ 创建了一相发送者来向这个topic发送一些消息。然后我们确认所有的subscriber都接收到了这些消息。</para>
</section>
<section>
- <title>Clustered Topic</title>
- <para>The <literal>clustered-topic</literal> example demonstrates a JMS topic deployed
- on two different nodes. The two nodes are configured to form a cluster. We then
- create a subscriber on the topic on each node, and we create a producer on only one
- of the nodes. We then send some messages via the producer, and we verify that both
- subscribers receive all the sent messages.</para>
+ <title>集群的Topic</title>
+ <para><literal>clustered-topic</literal>例子将一个JMS topic部署到两个节点上。这两个节点组成一个集群。
+ 然后在每个节点上创建了一个订阅者(subscriber),只在一个节点上创建一个发送者(producer)。通过这个发
+ 送者发送一些消息,确认两个订阅者都收到了这些消息。</para>
</section>
<section id="examples.consumer-rate-limit">
<title>Message Consumer Rate Limiting</title>
Modified: branches/HnetQ_323_cn/docs/user-manual/zh/jms-core-mapping.xml
===================================================================
--- branches/HnetQ_323_cn/docs/user-manual/zh/jms-core-mapping.xml 2010-03-25 10:23:18 UTC (rev 8963)
+++ branches/HnetQ_323_cn/docs/user-manual/zh/jms-core-mapping.xml 2010-03-25 14:14:40 UTC (rev 8964)
@@ -17,26 +17,20 @@
<!-- permitted by applicable law. -->
<!-- ============================================================================= -->
<chapter id="jms-core-mapping">
- <title>Mapping JMS Concepts to the Core API</title>
- <para>This chapter describes how JMS destinations are mapped to HornetQ addresses.</para>
- <para>HornetQ core is JMS-agnostic. It does not have any concept of a JMS topic. A JMS topic is
- implemented in core as an address (the topic name) with zero or more queues bound to it.
- Each queue bound to that address represents a topic subscription. Likewise, a JMS queue is
- implemented as an address (the JMS queue name) with one single queue bound to it which
- represents the JMS queue.</para>
- <para>By convention, all JMS queues map to core queues where the core queue name has the string
- <literal>jms.queue.</literal> prepended to it. E.g. the JMS queue with the name
- "orders.europe" would map to the core queue with the name "jms.queue.orders.europe". The
- address at which the core queue is bound is also given by the core queue name.</para>
- <para>For JMS topics the address at which the queues that represent the subscriptions are bound
- is given by prepending the string "jms.topic." to the name of the JMS topic. E.g. the JMS
- topic with name "news.europe" would map to the core address "jms.topic.news.europe"</para>
- <para>In other words if you send a JMS message to a JMS queue with name "orders.europe" it will
- get routed on the server to any core queues bound to the address "jms.queue.orders.europe".
- If you send a JMS message to a JMS topic with name "news.europe" it will get routed on the
- server to any core queues bound to the address "jms.topic.news.europe".</para>
- <para>If you want to configure settings for a JMS Queue with the name "orders.europe", you need
- to configure the corresponding core queue "jms.queue.orders.europe":</para>
+ <title>JMS与内核API之间的映射关系</title>
+ <para>本意讲述JMS的目标实体(destination)如何映射到HornetQ的地址(addresses)。</para>
+ <para>HornetQ的内核没有JMS的任何实现。在内核中没有topic的概念,它是通过在一个地址上(相当于topic的名字)绑定
+ 零或多个queue来实现JMS topic的功能的。每个绑定的queue相当于该topic的一个订阅(subscription)。
+ 类似地通过在一个地址上(相当于queue的名字)绑定单一的queue就可以实现JMS queue的功能。</para>
+ <para>按照惯例,所有的JMS queue所对应的内核queue的名字都以<literal>jms.queue.</literal>做为开头。比如
+ 当JMS queue的名字是"orders.europe"时,其对应的内核queue的名字应该是"jms.queue.orders.europe"。
+ 那么内核queue所绑定的地址的名字和该内核queue的名字是相同的。</para>
+ <para>同样,所有JMS topic所对应的内核地址的名字都以 "jms.topic."为前缀。比如当一个JMS topic的名字是"news.europe"
+ 时,它对应的内核的地址应该是"jms.topic.news.europe"。</para>
+ <para>换句话说就是如果你向JMS queue “orders.europe"发送一个消息,这个消息就会被路由到绑定在内核地址为“jms.queue.orders.europe”
+ 的同名内核queue中。 如果是向JMS topic “news.europe“发送一个消息,它会被路由到绑定到内核地址为
+ ”jms.topic.news.europe“的所有的内核queue中。</para>
+ <para>具体要配置一个名为“orders.europe"的JMS队列时,你需要配置相应的内核queue“jms.queue.orders.europe“:</para>
<programlisting>
<!-- expired messages in JMS Queue "orders.europe"
will be sent to the JMS Queue "expiry.europe" -->
14 years, 2 months
JBoss hornetq SVN: r8963 - in trunk: src/main/org/hornetq/core/server and 4 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-03-25 06:23:18 -0400 (Thu, 25 Mar 2010)
New Revision: 8963
Modified:
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-334: Prevent management operations until server is started
* add checkStarted() to management operations to throw a IllegalStateException if the server is not started
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-03-25 09:11:43 UTC (rev 8962)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-03-25 10:23:18 UTC (rev 8963)
@@ -131,6 +131,8 @@
public String getVersion()
{
+ checkStarted();
+
clearIO();
try
{
@@ -144,6 +146,8 @@
public boolean isBackup()
{
+ checkStarted();
+
clearIO();
try
{
@@ -157,6 +161,8 @@
public boolean isSharedStore()
{
+ checkStarted();
+
clearIO();
try
{
@@ -170,6 +176,8 @@
public String getBackupConnectorName()
{
+ checkStarted();
+
clearIO();
try
{
@@ -183,7 +191,9 @@
public String getBindingsDirectory()
{
- clearIO();
+ checkStarted();
+
+ clearIO();
try
{
return configuration.getBindingsDirectory();
@@ -196,6 +206,8 @@
public String[] getInterceptorClassNames()
{
+ checkStarted();
+
clearIO();
try
{
@@ -210,6 +222,8 @@
public int getJournalBufferSize()
{
+ checkStarted();
+
clearIO();
try
{
@@ -224,6 +238,8 @@
public int getJournalBufferTimeout()
{
+ checkStarted();
+
clearIO();
try
{
@@ -238,6 +254,8 @@
public int getJournalMaxIO()
{
+ checkStarted();
+
clearIO();
try
{
@@ -252,6 +270,8 @@
public String getJournalDirectory()
{
+ checkStarted();
+
clearIO();
try
{
@@ -265,6 +285,8 @@
public int getJournalFileSize()
{
+ checkStarted();
+
clearIO();
try
{
@@ -278,6 +300,8 @@
public int getJournalMinFiles()
{
+ checkStarted();
+
clearIO();
try
{
@@ -291,6 +315,8 @@
public int getJournalCompactMinFiles()
{
+ checkStarted();
+
clearIO();
try
{
@@ -304,6 +330,8 @@
public int getJournalCompactPercentage()
{
+ checkStarted();
+
clearIO();
try
{
@@ -317,6 +345,8 @@
public boolean isPersistenceEnabled()
{
+ checkStarted();
+
clearIO();
try
{
@@ -330,6 +360,8 @@
public String getJournalType()
{
+ checkStarted();
+
clearIO();
try
{
@@ -343,6 +375,8 @@
public String getPagingDirectory()
{
+ checkStarted();
+
clearIO();
try
{
@@ -356,6 +390,8 @@
public int getScheduledThreadPoolMaxSize()
{
+ checkStarted();
+
clearIO();
try
{
@@ -369,7 +405,9 @@
public int getThreadPoolMaxSize()
{
- clearIO();
+ checkStarted();
+
+ clearIO();
try
{
return configuration.getThreadPoolMaxSize();
@@ -382,6 +420,8 @@
public long getSecurityInvalidationInterval()
{
+ checkStarted();
+
clearIO();
try
{
@@ -395,6 +435,8 @@
public boolean isClustered()
{
+ checkStarted();
+
clearIO();
try
{
@@ -408,6 +450,8 @@
public boolean isCreateBindingsDir()
{
+ checkStarted();
+
clearIO();
try
{
@@ -421,6 +465,8 @@
public boolean isCreateJournalDir()
{
+ checkStarted();
+
clearIO();
try
{
@@ -434,6 +480,8 @@
public boolean isJournalSyncNonTransactional()
{
+ checkStarted();
+
clearIO();
try
{
@@ -447,6 +495,8 @@
public boolean isJournalSyncTransactional()
{
+ checkStarted();
+
clearIO();
try
{
@@ -460,6 +510,8 @@
public boolean isSecurityEnabled()
{
+ checkStarted();
+
clearIO();
try
{
@@ -473,6 +525,8 @@
public boolean isAsyncConnectionExecutionEnabled()
{
+ checkStarted();
+
clearIO();
try
{
@@ -485,6 +539,8 @@
}
public void deployQueue(final String address, final String name, final String filterString) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -502,6 +558,8 @@
public void deployQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception
{
+ checkStarted();
+
SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
clearIO();
try
@@ -517,6 +575,8 @@
public void createQueue(final String address, final String name) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -530,6 +590,8 @@
public void createQueue(final String address, final String name, final boolean durable) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -543,6 +605,8 @@
public void createQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -562,6 +626,8 @@
public String[] getQueueNames()
{
+ checkStarted();
+
clearIO();
try
{
@@ -583,6 +649,8 @@
public String[] getAddressNames()
{
+ checkStarted();
+
clearIO();
try
{
@@ -604,6 +672,8 @@
public void destroyQueue(final String name) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -619,6 +689,8 @@
public int getConnectionCount()
{
+ checkStarted();
+
clearIO();
try
{
@@ -632,6 +704,8 @@
public void enableMessageCounters()
{
+ checkStarted();
+
clearIO();
try
{
@@ -645,6 +719,8 @@
public void disableMessageCounters()
{
+ checkStarted();
+
clearIO();
try
{
@@ -658,6 +734,8 @@
public void resetAllMessageCounters()
{
+ checkStarted();
+
clearIO();
try
{
@@ -671,6 +749,8 @@
public void resetAllMessageCounterHistories()
{
+ checkStarted();
+
clearIO();
try
{
@@ -684,6 +764,8 @@
public boolean isMessageCounterEnabled()
{
+ checkStarted();
+
clearIO();
try
{
@@ -697,6 +779,8 @@
public synchronized long getMessageCounterSamplePeriod()
{
+ checkStarted();
+
clearIO();
try
{
@@ -710,6 +794,10 @@
public synchronized void setMessageCounterSamplePeriod(final long newPeriod)
{
+ checkStarted();
+
+ checkStarted();
+
clearIO();
try
{
@@ -732,6 +820,8 @@
public int getMessageCounterMaxDayCount()
{
+ checkStarted();
+
clearIO();
try
{
@@ -745,6 +835,8 @@
public void setMessageCounterMaxDayCount(final int count)
{
+ checkStarted();
+
clearIO();
try
{
@@ -762,6 +854,8 @@
public String[] listPreparedTransactions()
{
+ checkStarted();
+
clearIO();
try
{
@@ -795,6 +889,8 @@
public String[] listHeuristicCommittedTransactions()
{
+ checkStarted();
+
clearIO();
try
{
@@ -815,6 +911,8 @@
public String[] listHeuristicRolledBackTransactions()
{
+ checkStarted();
+
clearIO();
try
{
@@ -835,6 +933,8 @@
public synchronized boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -862,6 +962,7 @@
public synchronized boolean rollbackPreparedTransaction(final String transactionAsBase64) throws Exception
{
+ checkStarted();
clearIO();
try
@@ -891,6 +992,8 @@
public String[] listRemoteAddresses()
{
+ checkStarted();
+
clearIO();
try
{
@@ -913,6 +1016,8 @@
public String[] listRemoteAddresses(final String ipAddress)
{
+ checkStarted();
+
clearIO();
try
{
@@ -937,6 +1042,8 @@
public synchronized boolean closeConnectionsForAddress(final String ipAddress)
{
+ checkStarted();
+
clearIO();
try
{
@@ -965,6 +1072,8 @@
public String[] listConnectionIDs()
{
+ checkStarted();
+
clearIO();
try
{
@@ -985,6 +1094,8 @@
public String[] listSessions(final String connectionID)
{
+ checkStarted();
+
clearIO();
try
{
@@ -1005,6 +1116,8 @@
public Object[] getConnectors() throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -1035,6 +1148,8 @@
public String getConnectorsAsJSON() throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -1062,6 +1177,8 @@
String deleteTempQueueRoles,
String manageRoles)
{
+ checkStarted();
+
clearIO();
try
{
@@ -1078,6 +1195,8 @@
public void removeSecuritySettings(String addressMatch)
{
+ checkStarted();
+
clearIO();
try
{
@@ -1091,6 +1210,8 @@
public Set<Role> getSecuritySettings(String addressMatch) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -1103,6 +1224,10 @@
}
public Object[] getRoles(String addressMatch) throws Exception
{
+ checkStarted();
+
+ checkStarted();
+
clearIO();
try
{
@@ -1132,6 +1257,8 @@
public String getRolesAsJSON(String addressMatch) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -1152,6 +1279,8 @@
public String getAddressSettingsAsJSON(final String address) throws Exception
{
+ checkStarted();
+
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(address);
Map<String, Object> settings = new HashMap<String, Object>();
if(addressSettings.getDeadLetterAddress() != null)
@@ -1176,7 +1305,7 @@
return jsonObject.toString();
}
- public void addAddressSettings(final String address,
+ public void addAddressSettings(final String address,
final String DLA,
final String expiryAddress,
final boolean lastValueQueue,
@@ -1187,7 +1316,9 @@
final long redistributionDelay,
final boolean sendToDLAOnNoRoute,
final String addressFullMessagePolicy) throws Exception
- {
+ {
+ checkStarted();
+
AddressSettings addressSettings = new AddressSettings();
addressSettings.setDeadLetterAddress(DLA == null?null:new SimpleString(DLA));
addressSettings.setExpiryAddress(expiryAddress == null?null:new SimpleString(expiryAddress));
@@ -1219,16 +1350,22 @@
public AddressSettings getAddressSettings(final String address)
{
+ checkStarted();
+
return server.getAddressSettingsRepository().getMatch(address);
}
public void removeAddressSettings(String addressMatch)
{
+ checkStarted();
+
server.getAddressSettingsRepository().removeMatch(addressMatch);
}
public void sendQueueInfoToQueue(final String queueName, final String address) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -1399,5 +1536,13 @@
{
return MBeanInfoHelper.getMBeanOperationsInfo(HornetQServerControl.class);
}
+
+ private void checkStarted()
+ {
+ if (!server.isStarted())
+ {
+ throw new IllegalStateException("HornetQ Server is not started. It can not be managed yet");
+ }
+ }
}
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-03-25 09:11:43 UTC (rev 8962)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2010-03-25 10:23:18 UTC (rev 8963)
@@ -115,11 +115,15 @@
public String getAddress()
{
+ checkStarted();
+
return address;
}
public String getFilter()
{
+ checkStarted();
+
clearIO();
try
{
@@ -135,6 +139,8 @@
public boolean isDurable()
{
+ checkStarted();
+
clearIO();
try
{
@@ -148,6 +154,8 @@
public boolean isTemporary()
{
+ checkStarted();
+
clearIO();
try
{
@@ -161,6 +169,8 @@
public int getMessageCount()
{
+ checkStarted();
+
clearIO();
try
{
@@ -174,6 +184,8 @@
public int getConsumerCount()
{
+ checkStarted();
+
clearIO();
try
{
@@ -187,6 +199,8 @@
public int getDeliveringCount()
{
+ checkStarted();
+
clearIO();
try
{
@@ -200,6 +214,8 @@
public long getMessagesAdded()
{
+ checkStarted();
+
clearIO();
try
{
@@ -213,6 +229,8 @@
public long getID()
{
+ checkStarted();
+
clearIO();
try
{
@@ -226,6 +244,8 @@
public long getScheduledCount()
{
+ checkStarted();
+
clearIO();
try
{
@@ -239,6 +259,8 @@
public String getDeadLetterAddress()
{
+ checkStarted();
+
clearIO();
try
{
@@ -261,6 +283,8 @@
public void setDeadLetterAddress(final String deadLetterAddress) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -279,6 +303,8 @@
public String getExpiryAddress()
{
+ checkStarted();
+
clearIO();
try
{
@@ -301,6 +327,8 @@
public void setExpiryAddress(final String expiryAddress) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -323,6 +351,8 @@
public Map<String, Object>[] listScheduledMessages() throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -344,6 +374,8 @@
public String listScheduledMessagesAsJSON() throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -357,6 +389,8 @@
public Map<String, Object>[] listMessages(final String filterStr) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -386,6 +420,8 @@
public String listMessagesAsJSON(final String filter) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -399,6 +435,8 @@
public int countMessages(final String filterStr) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -430,6 +468,8 @@
public boolean removeMessage(final long messageID) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -447,6 +487,8 @@
public int removeMessages(final String filterStr) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -462,6 +504,8 @@
public boolean expireMessage(final long messageID) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -475,6 +519,8 @@
public int expireMessages(final String filterStr) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -493,6 +539,8 @@
public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -514,6 +562,8 @@
public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -539,6 +589,8 @@
public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -554,6 +606,8 @@
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -567,6 +621,8 @@
public int changeMessagesPriority(final String filterStr, final int newPriority) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -587,6 +643,8 @@
public boolean changeMessagePriority(final long messageID, final int newPriority) throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -605,6 +663,8 @@
public String listMessageCounter()
{
+ checkStarted();
+
clearIO();
try
{
@@ -622,6 +682,8 @@
public void resetMessageCounter()
{
+ checkStarted();
+
clearIO();
try
{
@@ -635,6 +697,8 @@
public String listMessageCounterAsHTML()
{
+ checkStarted();
+
clearIO();
try
{
@@ -648,6 +712,8 @@
public String listMessageCounterHistory() throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -661,6 +727,8 @@
public String listMessageCounterHistoryAsHTML()
{
+ checkStarted();
+
clearIO();
try
{
@@ -674,6 +742,8 @@
public void pause()
{
+ checkStarted();
+
clearIO();
try
{
@@ -687,6 +757,8 @@
public void resume()
{
+ checkStarted();
+
clearIO();
try
{
@@ -700,6 +772,8 @@
public boolean isPaused() throws Exception
{
+ checkStarted();
+
clearIO();
try
{
@@ -717,11 +791,20 @@
return MBeanInfoHelper.getMBeanOperationsInfo(QueueControl.class);
}
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+ private void checkStarted()
+ {
+ if (!postOffice.isStarted())
+ {
+ throw new IllegalStateException("HornetQ Server is not started. Queue can not be managed yet");
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-03-25 09:11:43 UTC (rev 8962)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-03-25 10:23:18 UTC (rev 8963)
@@ -63,6 +63,12 @@
Version getVersion();
+ /**
+ * Returns the resource to manage this HornetQ server.
+ *
+ * Using this control will throw IllegalStateException if the
+ * server is not properly started.
+ */
HornetQServerControlImpl getHornetQServerControl();
void registerActivateCallback(ActivateCallback callback);
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-03-25 09:11:43 UTC (rev 8962)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-03-25 10:23:18 UTC (rev 8963)
@@ -1104,10 +1104,11 @@
{
for (CoreQueueConfiguration config : configuration.getQueueConfigurations())
{
- messagingServerControl.deployQueue(config.getAddress(),
- config.getName(),
- config.getFilterString(),
- config.isDurable());
+ deployQueue(SimpleString.toSimpleString(config.getAddress()),
+ SimpleString.toSimpleString(config.getName()),
+ SimpleString.toSimpleString(config.getFilterString()),
+ config.isDurable(),
+ false);
}
}
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-03-25 09:11:43 UTC (rev 8962)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-03-25 10:23:18 UTC (rev 8963)
@@ -16,7 +16,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ListenerNotFoundException;
@@ -30,7 +29,6 @@
import javax.management.StandardMBean;
import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.jms.management.ConnectionFactoryControl;
@@ -38,13 +36,7 @@
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.api.jms.management.TopicControl;
import org.hornetq.core.management.impl.MBeanInfoHelper;
-import org.hornetq.core.security.Role;
-import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
-import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.server.JMSServerManager;
-import org.hornetq.utils.SecurityFormatter;
-import org.hornetq.utils.json.JSONArray;
-import org.hornetq.utils.json.JSONObject;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@@ -97,7 +89,7 @@
final Object[] liveConnectorTransportParams,
final Object[] backupConnectorsTransportClassNames,
final Object[] backupConnectorTransportParams)
- {
+ {
List<Pair<TransportConfiguration, TransportConfiguration>> pairs = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
for (int i = 0; i < liveConnectorsTransportClassNames.length; i++)
@@ -123,13 +115,13 @@
new TransportConfiguration(backupConnectorsTransportClassNames[i].toString(), backupParams);
}
Pair<TransportConfiguration, TransportConfiguration> pair = new Pair<TransportConfiguration, TransportConfiguration>(tcLive,
- tcBackup);
+ tcBackup);
pairs.add(pair);
}
return pairs;
- }
+ }
public static MBeanNotificationInfo[] getNotificationInfos()
{
@@ -141,7 +133,7 @@
}
return new MBeanNotificationInfo[] { new MBeanNotificationInfo(names,
JMSServerControl.class.getName(),
- "Notifications emitted by a JMS Server") };
+ "Notifications emitted by a JMS Server") };
}
// Constructors --------------------------------------------------
@@ -160,120 +152,231 @@
final String liveTransportClassName,
final Map<String, Object> liveTransportParams,
final Object[] jndiBindings) throws Exception
- {
- List<String> jndiBindingsList = JMSServerControlImpl.convert(jndiBindings);
- TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
+ {
+ checkStarted();
- server.createConnectionFactory(name, liveTC, jndiBindingsList);
+ clearIO();
- sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
- }
+ try
+ {
+ List<String> jndiBindingsList = JMSServerControlImpl.convert(jndiBindings);
+ TransportConfiguration liveTC = new TransportConfiguration(liveTransportClassName, liveTransportParams);
+ server.createConnectionFactory(name, liveTC, jndiBindingsList);
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
public void createConnectionFactory(final String name,
final Object[] liveConnectorsTransportClassNames,
final Object[] liveConnectorTransportParams,
final Object[] backupConnectorsTransportClassNames,
final Object[] backupConnectorTransportParams,
final Object[] jndiBindings) throws Exception
- {
- List<Pair<TransportConfiguration, TransportConfiguration>> pairs = JMSServerControlImpl.convertToConnectorPairs(liveConnectorsTransportClassNames,
- liveConnectorTransportParams,
- backupConnectorsTransportClassNames,
- backupConnectorTransportParams);
- List<String> jndiBindingsList = JMSServerControlImpl.convert(jndiBindings);
+ {
+ checkStarted();
- server.createConnectionFactory(name, pairs, jndiBindingsList);
+ clearIO();
- sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
- }
+ try
+ {
+ List<Pair<TransportConfiguration, TransportConfiguration>> pairs = JMSServerControlImpl.convertToConnectorPairs(liveConnectorsTransportClassNames,
+ liveConnectorTransportParams,
+ backupConnectorsTransportClassNames,
+ backupConnectorTransportParams);
+ List<String> jndiBindingsList = JMSServerControlImpl.convert(jndiBindings);
+ server.createConnectionFactory(name, pairs, jndiBindingsList);
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
public void createConnectionFactory(final String name,
final String liveTransportClassNames,
final String liveTransportParams,
final String backupTransportClassNames,
final String backupTransportParams,
final String jndiBindings) throws Exception
- {
- Object[] liveClassNames = JMSServerControlImpl.toArray(liveTransportClassNames);
- Object[] liveParams = ManagementHelper.fromCommaSeparatedArrayOfCommaSeparatedKeyValues(liveTransportParams);
- Object[] backupClassNames = JMSServerControlImpl.toArray(backupTransportClassNames);
- Object[] backupParams = ManagementHelper.fromCommaSeparatedArrayOfCommaSeparatedKeyValues(backupTransportParams);;
- Object[] bindings = JMSServerControlImpl.toArray(jndiBindings);
- createConnectionFactory(name, liveClassNames, liveParams, backupClassNames, backupParams, bindings);
- }
+ {
+ checkStarted();
+ clearIO();
+ try
+ {
+ Object[] liveClassNames = JMSServerControlImpl.toArray(liveTransportClassNames);
+ Object[] liveParams = ManagementHelper.fromCommaSeparatedArrayOfCommaSeparatedKeyValues(liveTransportParams);
+ Object[] backupClassNames = JMSServerControlImpl.toArray(backupTransportClassNames);
+ Object[] backupParams = ManagementHelper.fromCommaSeparatedArrayOfCommaSeparatedKeyValues(backupTransportParams);;
+ Object[] bindings = JMSServerControlImpl.toArray(jndiBindings);
+ createConnectionFactory(name, liveClassNames, liveParams, backupClassNames, backupParams, bindings);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+
public void createConnectionFactory(final String name,
final String discoveryAddress,
final int discoveryPort,
final Object[] jndiBindings) throws Exception
- {
- List<String> jndiBindingsList = JMSServerControlImpl.convert(jndiBindings);
+ {
+ checkStarted();
- server.createConnectionFactory(name, discoveryAddress, discoveryPort, jndiBindingsList);
+ clearIO();
- sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
- }
+ try
+ {
+ List<String> jndiBindingsList = JMSServerControlImpl.convert(jndiBindings);
+ server.createConnectionFactory(name, discoveryAddress, discoveryPort, jndiBindingsList);
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
public void createConnectionFactory(final String name,
final String discoveryAddress,
final int discoveryPort,
final String jndiBindings) throws Exception
- {
- Object[] bindings = JMSServerControlImpl.toArray(jndiBindings);
+ {
+ checkStarted();
- createConnectionFactory(name, discoveryAddress, discoveryPort, bindings);
- }
+ clearIO();
+ try
+ {
+ Object[] bindings = JMSServerControlImpl.toArray(jndiBindings);
+ createConnectionFactory(name, discoveryAddress, discoveryPort, bindings);
+
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+
public boolean createQueue(final String name, final String jndiBinding) throws Exception
{
- boolean created = server.createQueue(name, jndiBinding, null, true);
- if (created)
+ checkStarted();
+
+ clearIO();
+
+ try
{
- sendNotification(NotificationType.QUEUE_CREATED, name);
+ boolean created = server.createQueue(name, jndiBinding, null, true);
+ if (created)
+ {
+ sendNotification(NotificationType.QUEUE_CREATED, name);
+ }
+ return created;
}
- return created;
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean destroyQueue(final String name) throws Exception
{
- boolean destroyed = server.destroyQueue(name);
- if (destroyed)
+ checkStarted();
+
+ clearIO();
+
+ try
{
- sendNotification(NotificationType.QUEUE_DESTROYED, name);
+ boolean destroyed = server.destroyQueue(name);
+ if (destroyed)
+ {
+ sendNotification(NotificationType.QUEUE_DESTROYED, name);
+ }
+ return destroyed;
}
- return destroyed;
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean createTopic(final String topicName, final String jndiBinding) throws Exception
{
- boolean created = server.createTopic(topicName, jndiBinding);
- if (created)
+ checkStarted();
+
+ clearIO();
+
+ try
{
- sendNotification(NotificationType.TOPIC_CREATED, topicName);
+ boolean created = server.createTopic(topicName, jndiBinding);
+ if (created)
+ {
+ sendNotification(NotificationType.TOPIC_CREATED, topicName);
+ }
+ return created;
}
- return created;
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean destroyTopic(final String name) throws Exception
{
- boolean destroyed = server.destroyTopic(name);
- if (destroyed)
+ checkStarted();
+
+ clearIO();
+
+ try
{
- sendNotification(NotificationType.TOPIC_DESTROYED, name);
+ boolean destroyed = server.destroyTopic(name);
+ if (destroyed)
+ {
+ sendNotification(NotificationType.TOPIC_DESTROYED, name);
+ }
+ return destroyed;
}
- return destroyed;
+ finally
+ {
+ blockOnIO();
+ }
}
public void destroyConnectionFactory(final String name) throws Exception
{
- boolean destroyed = server.destroyConnectionFactory(name);
- if (destroyed)
+ checkStarted();
+
+ clearIO();
+
+ try
{
- sendNotification(NotificationType.CONNECTION_FACTORY_DESTROYED, name);
+ boolean destroyed = server.destroyConnectionFactory(name);
+ if (destroyed)
+ {
+ sendNotification(NotificationType.CONNECTION_FACTORY_DESTROYED, name);
+ }
}
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isStarted()
@@ -283,45 +386,80 @@
public String getVersion()
{
+ checkStarted();
+
return server.getVersion();
}
public String[] getQueueNames()
{
- Object[] queueControls = server.getHornetQServer().getManagementService().getResources(JMSQueueControl.class);
- String[] names = new String[queueControls.length];
- for (int i = 0; i < queueControls.length; i++)
+ checkStarted();
+
+ clearIO();
+
+ try
{
- JMSQueueControl queueControl = (JMSQueueControl)queueControls[i];
- names[i] = queueControl.getName();
+ Object[] queueControls = server.getHornetQServer().getManagementService().getResources(JMSQueueControl.class);
+ String[] names = new String[queueControls.length];
+ for (int i = 0; i < queueControls.length; i++)
+ {
+ JMSQueueControl queueControl = (JMSQueueControl)queueControls[i];
+ names[i] = queueControl.getName();
+ }
+ return names;
}
- return names;
+ finally
+ {
+ blockOnIO();
+ }
}
public String[] getTopicNames()
{
- Object[] topicControls = server.getHornetQServer().getManagementService().getResources(TopicControl.class);
- String[] names = new String[topicControls.length];
- for (int i = 0; i < topicControls.length; i++)
+ checkStarted();
+
+ clearIO();
+
+ try
{
- TopicControl topicControl = (TopicControl)topicControls[i];
- names[i] = topicControl.getName();
+ Object[] topicControls = server.getHornetQServer().getManagementService().getResources(TopicControl.class);
+ String[] names = new String[topicControls.length];
+ for (int i = 0; i < topicControls.length; i++)
+ {
+ TopicControl topicControl = (TopicControl)topicControls[i];
+ names[i] = topicControl.getName();
+ }
+ return names;
}
- return names;
+ finally
+ {
+ blockOnIO();
+ }
}
public String[] getConnectionFactoryNames()
{
- Object[] cfControls = server.getHornetQServer()
- .getManagementService()
- .getResources(ConnectionFactoryControl.class);
- String[] names = new String[cfControls.length];
- for (int i = 0; i < cfControls.length; i++)
+ checkStarted();
+
+ clearIO();
+
+ try
{
- ConnectionFactoryControl cfControl = (ConnectionFactoryControl)cfControls[i];
- names[i] = cfControl.getName();
+ Object[] cfControls = server.getHornetQServer()
+ .getManagementService()
+ .getResources(ConnectionFactoryControl.class);
+ String[] names = new String[cfControls.length];
+ for (int i = 0; i < cfControls.length; i++)
+ {
+ ConnectionFactoryControl cfControl = (ConnectionFactoryControl)cfControls[i];
+ names[i] = cfControl.getName();
+ }
+ return names;
}
- return names;
+ finally
+ {
+ blockOnIO();
+ }
}
// NotificationEmitter implementation ----------------------------
@@ -329,9 +467,9 @@
public void removeNotificationListener(final NotificationListener listener,
final NotificationFilter filter,
final Object handback) throws ListenerNotFoundException
- {
+ {
broadcaster.removeNotificationListener(listener, filter, handback);
- }
+ }
public void removeNotificationListener(final NotificationListener listener) throws ListenerNotFoundException
{
@@ -341,9 +479,9 @@
public void addNotificationListener(final NotificationListener listener,
final NotificationFilter filter,
final Object handback) throws IllegalArgumentException
- {
+ {
broadcaster.addNotificationListener(listener, filter, handback);
- }
+ }
public MBeanNotificationInfo[] getNotificationInfo()
{
@@ -352,32 +490,84 @@
public String[] listRemoteAddresses() throws Exception
{
- return server.listRemoteAddresses();
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ return server.listRemoteAddresses();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String[] listRemoteAddresses(final String ipAddress) throws Exception
{
- return server.listRemoteAddresses(ipAddress);
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ return server.listRemoteAddresses(ipAddress);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean closeConnectionsForAddress(final String ipAddress) throws Exception
{
- return server.closeConnectionsForAddress(ipAddress);
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ return server.closeConnectionsForAddress(ipAddress);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String[] listConnectionIDs() throws Exception
{
- return server.listConnectionIDs();
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ return server.listConnectionIDs();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String[] listSessions(final String connectionID) throws Exception
{
- return server.listSessions(connectionID);
- }
+ checkStarted();
+ clearIO();
+ try
+ {
+ return server.listSessions(connectionID);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
-
@Override
public MBeanInfo getMBeanInfo()
{
@@ -402,6 +592,41 @@
broadcaster.sendNotification(notif);
}
+ private void checkStarted()
+ {
+ if (!server.isStarted())
+ {
+ throw new IllegalStateException("HornetQ JMS Server is not started. it can not be managed yet");
+ }
+ }
+
+ protected void clearIO()
+ {
+ // the storage manager could be null on the backup on certain components
+ if (server.getHornetQServer().getStorageManager() != null)
+ {
+ server.getHornetQServer().getStorageManager().clearContext();
+ }
+ }
+
+ protected void blockOnIO()
+ {
+ // the storage manager could be null on the backup on certain components
+ if (server.getHornetQServer().getStorageManager() != null)
+ {
+ try
+ {
+ server.getHornetQServer().getStorageManager().waitOnOperations();
+ server.getHornetQServer().getStorageManager().clearContext();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ }
+
// Inner classes -------------------------------------------------
public static enum NotificationType
@@ -414,7 +639,7 @@
CONNECTION_FACTORY_DESTROYED;
}
- private static List<String> toList(final String commaSeparatedString)
+ private static List<String> toList(final String commaSeparatedString)
{
List<String> list = new ArrayList<String>();
if (commaSeparatedString == null || commaSeparatedString.trim().length() == 0)
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-25 09:11:43 UTC (rev 8962)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-25 10:23:18 UTC (rev 8963)
@@ -234,7 +234,7 @@
public boolean isStarted()
{
- return server.getHornetQServerControl().isStarted();
+ return server.isStarted();
}
// JMSServerManager implementation -------------------------------
@@ -275,7 +275,7 @@
{
checkInitialised();
- return server.getHornetQServerControl().getVersion();
+ return server.getVersion().getFullVersion();
}
public synchronized boolean createQueue(final String queueName,
@@ -287,17 +287,18 @@
HornetQDestination jBossQueue = HornetQDestination.createQueue(queueName);
// Convert from JMS selector to core filter
- String coreFilterString = null;
+ SimpleString coreFilterString = null;
if (selectorString != null)
{
- coreFilterString = SelectorTranslator.convertToHornetQFilterString(selectorString);
+ coreFilterString = SimpleString.toSimpleString(SelectorTranslator.convertToHornetQFilterString(selectorString));
}
- server.getHornetQServerControl().deployQueue(jBossQueue.getAddress(),
- jBossQueue.getAddress(),
- coreFilterString,
- durable);
+ server.deployQueue(jBossQueue.getSimpleAddress(),
+ jBossQueue.getSimpleAddress(),
+ coreFilterString,
+ durable,
+ false);
boolean added = bindToJndi(jndiBinding, jBossQueue);
@@ -318,10 +319,12 @@
// checks when routing messages to a topic that
// does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
// subscriptions - core has no notion of a topic
- server.getHornetQServerControl().deployQueue(jBossTopic.getAddress(),
- jBossTopic.getAddress(),
- JMSServerManagerImpl.REJECT_FILTER,
- true);
+ server.deployQueue(jBossTopic.getSimpleAddress(),
+ jBossTopic.getSimpleAddress(),
+ SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER),
+ true,
+ false);
+
boolean added = bindToJndi(jndiBinding, jBossTopic);
if (added)
{
@@ -359,7 +362,7 @@
destinations.remove(name);
jmsManagementService.unregisterQueue(name);
- server.getHornetQServerControl().destroyQueue(HornetQDestination.createQueueAddressFromName(name).toString());
+ server.destroyQueue(HornetQDestination.createQueueAddressFromName(name), null);
return true;
}
@@ -386,7 +389,7 @@
// We can't remove the remote binding. As this would be the bridge associated with the topic on this case
if (binding.getType() != BindingType.REMOTE_QUEUE)
{
- server.getHornetQServerControl().destroyQueue(queueName);
+ server.destroyQueue(SimpleString.toSimpleString(queueName), null);
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2010-03-25 09:11:43 UTC (rev 8962)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2010-03-25 10:23:18 UTC (rev 8963)
@@ -14,11 +14,6 @@
package org.hornetq.tests.integration.jms.server.management;
-import static org.hornetq.tests.util.RandomUtil.randomSimpleString;
-import static org.hornetq.tests.util.RandomUtil.randomString;
-
-import java.util.Set;
-
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
@@ -27,13 +22,10 @@
import junit.framework.Assert;
-import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.AddressControl;
-import org.hornetq.api.core.management.HornetQServerControl;
import org.hornetq.api.core.management.ObjectNameBuilder;
import org.hornetq.api.core.management.ResourceNames;
-import org.hornetq.api.core.management.RoleInfo;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DiscoveryGroupConfiguration;
@@ -42,12 +34,10 @@
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
-import org.hornetq.core.security.Role;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
-import org.hornetq.jms.client.HornetQQueueBrowser;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementControlHelper;
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-03-25 09:11:43 UTC (rev 8962)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-03-25 10:23:18 UTC (rev 8963)
@@ -26,7 +26,6 @@
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.security.Role;
-import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
14 years, 2 months
JBoss hornetq SVN: r8962 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-03-25 05:11:43 -0400 (Thu, 25 Mar 2010)
New Revision: 8962
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
Log:
optimisation - remove locking in producer server side credit management
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-03-25 09:11:43 UTC (rev 8962)
@@ -627,7 +627,7 @@
}
- private Queue<Runnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<Runnable>();
+ private Queue<OurRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<OurRunnable>();
private class MemoryFreedRunnablesExecutor implements Runnable
{
@@ -644,20 +644,49 @@
private final Runnable memoryFreedRunnablesExecutor = new MemoryFreedRunnablesExecutor();
- private final Object runnableLock = new Object();
-
+ class OurRunnable implements Runnable
+ {
+ boolean ran;
+
+ final Runnable runnable;
+
+ OurRunnable(final Runnable runnable)
+ {
+ this.runnable = runnable;
+ }
+
+ public synchronized void run()
+ {
+ if (!ran)
+ {
+ runnable.run();
+
+ ran = true;
+ }
+ }
+ }
+
public void executeRunnableWhenMemoryAvailable(final Runnable runnable)
{
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize != -1)
{
- synchronized (runnableLock)
+ if (sizeInBytes.get() > maxSize)
{
- if (sizeInBytes.get() > maxSize)
+ OurRunnable ourRunnable = new OurRunnable(runnable);
+
+ onMemoryFreedRunnables.add(ourRunnable);
+
+ //We check again to avoid a race condition where the size can come down just after the element
+ //has been added, but the check to execute was done before the element was added
+ //NOTE! We do not fix this race by locking the whole thing, doing this check provides
+ //MUCH better performance in a highly concurrent environment
+ if (sizeInBytes.get() <= maxSize)
{
- onMemoryFreedRunnables.add(runnable);
+ //run it now
+ ourRunnable.run();
+ }
- return;
- }
+ return;
}
}
runnable.run();
@@ -669,16 +698,13 @@
{
if (maxSize != -1)
{
- synchronized (runnableLock)
+ long newSize = sizeInBytes.addAndGet(size);
+
+ if (newSize <= maxSize)
{
- long newSize = sizeInBytes.addAndGet(size);
-
- if (newSize <= maxSize)
+ if (!onMemoryFreedRunnables.isEmpty())
{
- if (!onMemoryFreedRunnables.isEmpty())
- {
- executor.execute(memoryFreedRunnablesExecutor);
- }
+ executor.execute(memoryFreedRunnablesExecutor);
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2010-03-25 05:56:22 UTC (rev 8961)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2010-03-25 09:11:43 UTC (rev 8962)
@@ -661,76 +661,6 @@
server.stop();
}
- public void testBlockingIssue() throws Exception
- {
- // HornetQServer server = createServer(true, true);
- //
- // AddressSettings addressSettings = new AddressSettings();
- // addressSettings.setMaxSizeBytes(300000);
- // addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
- //
- // HierarchicalRepository<AddressSettings> repos = server.getAddressSettingsRepository();
- // repos.addMatch("bar".toString(), addressSettings);
- //
- // server.start();
-
- // ClientSessionFactory sf = createFactory(true);
-
- TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName());
-
- ClientSessionFactory sf = HornetQClient.createClientSessionFactory(tc);
-
-// ClientSession sess = sf.createSession();
-//
-// sess.createQueue("bar", "bar");
-
- int count = 0;
- while (true)
- {
- log.info("*** ITERATION " + count++ + "\n\n\n\n");
- ClientSession session = sf.createTransactedSession();
-
- ClientProducer producer = session.createProducer("bar");
-
- for (int i = 0; i < 1000; i++)
- {
-
- ClientMessage message = session.createMessage(true);
-
- message.getBodyBuffer().writeString("Hello");
-
- producer.send(message);
-
- // log.info("sent " + i);
-
- }
-
- session.commit();
-
- session.close();
-
- session = sf.createSession();
-
- session.start();
-
- ClientConsumer consumer = session.createConsumer("bar");
-
- for (int i = 0; i < 1000; i++)
- {
-
- ClientMessage msgReceived = consumer.receive();
-
- msgReceived.acknowledge();
-
- // log.info("read " + i);
-
- }
-
- session.close();
-
- }
- }
-
public void testProducerCreditsCaching5() throws Exception
{
HornetQServer server = createServer(false, isNetty());
14 years, 2 months
JBoss hornetq SVN: r8961 - in branches/Clebert_TMP: src/main/org/hornetq/core and 19 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-03-25 01:56:22 -0400 (Thu, 25 Mar 2010)
New Revision: 8961
Added:
branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/
branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedAddressSetting.java
branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedRoles.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedConnectionFactory.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
branches/Clebert_TMP/src/main/org/hornetq/utils/BufferHelper.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/AddressSettingsConfigurationStorageTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/JMSConnectionFactoryConfigurationStorageTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/RolesConfigurationStorageTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
Modified:
branches/Clebert_TMP/src/main/org/hornetq/api/core/management/HornetQServerControl.java
branches/Clebert_TMP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/Clebert_TMP/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Clebert_TMP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Clebert_TMP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Clebert_TMP/src/main/org/hornetq/core/settings/impl/AddressSettings.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Uploading local copy to a branch
Modified: branches/Clebert_TMP/src/main/org/hornetq/api/core/management/HornetQServerControl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -524,7 +524,7 @@
AddressSettings getAddressSettings(String address);
- void removeAddressSettings(String addressMatch);
+ void removeAddressSettings(String addressMatch) throws Exception;
/**
* returns the address settings as a JSON string
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -37,6 +37,8 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.messagecounter.MessageCounterManager;
import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistconfig.PersistedRoles;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.server.RemotingService;
@@ -83,9 +85,10 @@
private final NotificationBroadcasterSupport broadcaster;
+ // private final Map
+
// Static --------------------------------------------------------
-
// Constructors --------------------------------------------------
public HornetQServerControlImpl(final PostOffice postOffice,
@@ -483,6 +486,7 @@
blockOnIO();
}
}
+
public void deployQueue(final String address, final String name, final String filterString) throws Exception
{
clearIO();
@@ -1052,7 +1056,7 @@
blockOnIO();
}
}
-
+
public void addSecuritySettings(String addressMatch,
String sendRoles,
String consumeRoles,
@@ -1060,14 +1064,31 @@
String deleteDurableQueueRoles,
String createTempQueueRoles,
String deleteTempQueueRoles,
- String manageRoles)
+ String manageRoles) throws Exception
{
clearIO();
try
{
- Set<Role> roles = SecurityFormatter.createSecurity(sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createTempQueueRoles, deleteTempQueueRoles, manageRoles);
+ Set<Role> roles = SecurityFormatter.createSecurity(sendRoles,
+ consumeRoles,
+ createDurableQueueRoles,
+ deleteDurableQueueRoles,
+ createTempQueueRoles,
+ deleteTempQueueRoles,
+ manageRoles);
- server.getSecurityRepository().addMatch(addressMatch, roles );
+ server.getSecurityRepository().addMatch(addressMatch, roles);
+
+ PersistedRoles persistedRoles = new PersistedRoles(addressMatch,
+ sendRoles,
+ consumeRoles,
+ createDurableQueueRoles,
+ deleteDurableQueueRoles,
+ createTempQueueRoles,
+ deleteTempQueueRoles,
+ manageRoles);
+
+ storageManager.storeSecurityRoles(persistedRoles);
}
finally
{
@@ -1075,13 +1096,13 @@
}
}
-
- public void removeSecuritySettings(String addressMatch)
+ public void removeSecuritySettings(String addressMatch) throws Exception
{
clearIO();
try
{
server.getSecurityRepository().removeMatch(addressMatch);
+ storageManager.deleteSecurityRoles(new SimpleString(addressMatch));
}
finally
{
@@ -1098,9 +1119,10 @@
}
finally
{
- blockOnIO();
+ blockOnIO();
}
}
+
public Object[] getRoles(String addressMatch) throws Exception
{
clearIO();
@@ -1154,11 +1176,11 @@
{
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(address);
Map<String, Object> settings = new HashMap<String, Object>();
- if(addressSettings.getDeadLetterAddress() != null)
+ if (addressSettings.getDeadLetterAddress() != null)
{
settings.put("DLA", addressSettings.getDeadLetterAddress());
}
- if(addressSettings.getExpiryAddress() != null)
+ if (addressSettings.getExpiryAddress() != null)
{
settings.put("expiryAddress", addressSettings.getExpiryAddress());
}
@@ -1169,14 +1191,16 @@
settings.put("redistributionDelay", addressSettings.getRedistributionDelay());
settings.put("lastValueQueue", addressSettings.isLastValueQueue());
settings.put("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute());
- String policy = addressSettings.getAddressFullMessagePolicy() == AddressFullMessagePolicy.PAGE?"PAGE":addressSettings.getAddressFullMessagePolicy() == AddressFullMessagePolicy.BLOCK?"BLOCK":"DROP";
+ String policy = addressSettings.getAddressFullMessagePolicy() == AddressFullMessagePolicy.PAGE ? "PAGE"
+ : addressSettings.getAddressFullMessagePolicy() == AddressFullMessagePolicy.BLOCK ? "BLOCK"
+ : "DROP";
settings.put("addressFullMessagePolicy", policy);
JSONObject jsonObject = new JSONObject(settings);
return jsonObject.toString();
}
- public void addAddressSettings(final String address,
+ public void addAddressSettings(final String address,
final String DLA,
final String expiryAddress,
final boolean lastValueQueue,
@@ -1189,8 +1213,8 @@
final String addressFullMessagePolicy) throws Exception
{
AddressSettings addressSettings = new AddressSettings();
- addressSettings.setDeadLetterAddress(DLA == null?null:new SimpleString(DLA));
- addressSettings.setExpiryAddress(expiryAddress == null?null:new SimpleString(expiryAddress));
+ addressSettings.setDeadLetterAddress(DLA == null ? null : new SimpleString(DLA));
+ addressSettings.setExpiryAddress(expiryAddress == null ? null : new SimpleString(expiryAddress));
addressSettings.setLastValueQueue(lastValueQueue);
addressSettings.setMaxDeliveryAttempts(deliveryAttempts);
addressSettings.setMaxSizeBytes(maxSizeBytes);
@@ -1198,23 +1222,25 @@
addressSettings.setRedeliveryDelay(redeliveryDelay);
addressSettings.setRedistributionDelay(redistributionDelay);
addressSettings.setSendToDLAOnNoRoute(sendToDLAOnNoRoute);
- if(addressFullMessagePolicy == null)
+ if (addressFullMessagePolicy == null)
{
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
}
- else if(addressFullMessagePolicy.equalsIgnoreCase("PAGE"))
+ else if (addressFullMessagePolicy.equalsIgnoreCase("PAGE"))
{
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
}
- else if(addressFullMessagePolicy.equalsIgnoreCase("DROP"))
+ else if (addressFullMessagePolicy.equalsIgnoreCase("DROP"))
{
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
}
- else if(addressFullMessagePolicy.equalsIgnoreCase("BLOCK"))
+ else if (addressFullMessagePolicy.equalsIgnoreCase("BLOCK"))
{
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
}
server.getAddressSettingsRepository().addMatch(address, addressSettings);
+
+ storageManager.storeAddressSetting(new PersistedAddressSetting(new SimpleString(address), addressSettings));
}
public AddressSettings getAddressSettings(final String address)
@@ -1222,9 +1248,10 @@
return server.getAddressSettingsRepository().getMatch(address);
}
- public void removeAddressSettings(String addressMatch)
+ public void removeAddressSettings(String addressMatch) throws Exception
{
server.getAddressSettingsRepository().removeMatch(addressMatch);
+ storageManager.deleteAddressSetting(new SimpleString(addressMatch));
}
public void sendQueueInfoToQueue(final String queueName, final String address) throws Exception
Added: branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedAddressSetting.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedAddressSetting.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedAddressSetting.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistconfig;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.settings.impl.AddressSettings;
+
+/**
+ * A PersistedAddressSetting
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public class PersistedAddressSetting implements EncodingSupport
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long storeId;
+
+ private SimpleString addressMatch;
+
+ private AddressSettings setting;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PersistedAddressSetting()
+ {
+ super();
+ }
+
+ /**
+ * @param addressMatch
+ * @param setting
+ */
+ public PersistedAddressSetting(SimpleString addressMatch, AddressSettings setting)
+ {
+ super();
+ this.addressMatch = addressMatch;
+ this.setting = setting;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setStoreId(long id)
+ {
+ this.storeId = id;
+ }
+
+ public long getStoreId()
+ {
+ return storeId;
+ }
+
+ /**
+ * @return the addressMatch
+ */
+ public SimpleString getAddressMatch()
+ {
+ return addressMatch;
+ }
+
+ /**
+ * @return the setting
+ */
+ public AddressSettings getSetting()
+ {
+ return setting;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ addressMatch = buffer.readSimpleString();
+
+ setting = new AddressSettings();
+ setting.decode(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeSimpleString(addressMatch);
+
+ setting.encode(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return addressMatch.sizeof() + setting.getEncodeSize();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedRoles.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedRoles.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedRoles.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,316 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistconfig;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.EncodingSupport;
+
+/**
+ * A ConfiguredRoles
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PersistedRoles implements EncodingSupport
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long storeId;
+
+ private SimpleString addressMatch;
+
+ private SimpleString sendRoles;
+
+ private SimpleString consumeRoles;
+
+ private SimpleString createDurableQueueRoles;
+
+ private SimpleString deleteDurableQueueRoles;
+
+ private SimpleString createTempQueueRoles;
+
+ private SimpleString deleteTempQueueRoles;
+
+ private SimpleString manageRoles;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PersistedRoles()
+ {
+ }
+
+ /**
+ * @param address
+ * @param addressMatch
+ * @param sendRoles
+ * @param consumeRoles
+ * @param createDurableQueueRoles
+ * @param deleteDurableQueueRoles
+ * @param createTempQueueRoles
+ * @param deleteTempQueueRoles
+ * @param manageRoles
+ */
+ public PersistedRoles(final String addressMatch,
+ final String sendRoles,
+ final String consumeRoles,
+ final String createDurableQueueRoles,
+ final String deleteDurableQueueRoles,
+ final String createTempQueueRoles,
+ final String deleteTempQueueRoles,
+ final String manageRoles)
+ {
+ super();
+ this.addressMatch = SimpleString.toSimpleString(addressMatch);
+ this.sendRoles = SimpleString.toSimpleString(sendRoles);
+ this.consumeRoles = SimpleString.toSimpleString(consumeRoles);
+ this.createDurableQueueRoles = SimpleString.toSimpleString(createDurableQueueRoles);
+ this.deleteDurableQueueRoles = SimpleString.toSimpleString(deleteDurableQueueRoles);
+ this.createTempQueueRoles = SimpleString.toSimpleString(createTempQueueRoles);
+ this.deleteTempQueueRoles = SimpleString.toSimpleString(deleteTempQueueRoles);
+ this.manageRoles = SimpleString.toSimpleString(manageRoles);
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getStoreId()
+ {
+ return storeId;
+ }
+
+ public void setStoreId(final long id)
+ {
+ storeId = id;
+ }
+
+ /**
+ * @return the addressMatch
+ */
+ public SimpleString getAddressMatch()
+ {
+ return addressMatch;
+ }
+
+ /**
+ * @return the sendRoles
+ */
+ public String getSendRoles()
+ {
+ return sendRoles.toString();
+ }
+
+ /**
+ * @return the consumeRoles
+ */
+ public String getConsumeRoles()
+ {
+ return consumeRoles.toString();
+ }
+
+ /**
+ * @return the createDurableQueueRoles
+ */
+ public String getCreateDurableQueueRoles()
+ {
+ return createDurableQueueRoles.toString();
+ }
+
+ /**
+ * @return the deleteDurableQueueRoles
+ */
+ public String getDeleteDurableQueueRoles()
+ {
+ return deleteDurableQueueRoles.toString();
+ }
+
+ /**
+ * @return the createTempQueueRoles
+ */
+ public String getCreateTempQueueRoles()
+ {
+ return createTempQueueRoles.toString();
+ }
+
+ /**
+ * @return the deleteTempQueueRoles
+ */
+ public String getDeleteTempQueueRoles()
+ {
+ return deleteTempQueueRoles.toString();
+ }
+
+ /**
+ * @return the manageRoles
+ */
+ public String getManageRoles()
+ {
+ return manageRoles.toString();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeSimpleString(addressMatch);
+ buffer.writeNullableSimpleString(sendRoles);
+ buffer.writeNullableSimpleString(consumeRoles);
+ buffer.writeNullableSimpleString(createDurableQueueRoles);
+ buffer.writeNullableSimpleString(deleteDurableQueueRoles);
+ buffer.writeNullableSimpleString(createTempQueueRoles);
+ buffer.writeNullableSimpleString(deleteTempQueueRoles);
+ buffer.writeNullableSimpleString(manageRoles);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return addressMatch.sizeof() + SimpleString.sizeofNullableString(sendRoles) +
+ SimpleString.sizeofNullableString(consumeRoles) +
+ SimpleString.sizeofNullableString(createDurableQueueRoles) +
+ SimpleString.sizeofNullableString(deleteDurableQueueRoles) +
+ SimpleString.sizeofNullableString(createTempQueueRoles) +
+ SimpleString.sizeofNullableString(deleteTempQueueRoles) +
+ SimpleString.sizeofNullableString(manageRoles);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(final HornetQBuffer buffer)
+ {
+ addressMatch = buffer.readSimpleString();
+ sendRoles = buffer.readNullableSimpleString();
+ consumeRoles = buffer.readNullableSimpleString();
+ createDurableQueueRoles = buffer.readNullableSimpleString();
+ deleteDurableQueueRoles = buffer.readNullableSimpleString();
+ createTempQueueRoles = buffer.readNullableSimpleString();
+ deleteTempQueueRoles = buffer.readNullableSimpleString();
+ manageRoles = buffer.readNullableSimpleString();
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((addressMatch == null) ? 0 : addressMatch.hashCode());
+ result = prime * result + ((consumeRoles == null) ? 0 : consumeRoles.hashCode());
+ result = prime * result + ((createDurableQueueRoles == null) ? 0 : createDurableQueueRoles.hashCode());
+ result = prime * result + ((createTempQueueRoles == null) ? 0 : createTempQueueRoles.hashCode());
+ result = prime * result + ((deleteDurableQueueRoles == null) ? 0 : deleteDurableQueueRoles.hashCode());
+ result = prime * result + ((deleteTempQueueRoles == null) ? 0 : deleteTempQueueRoles.hashCode());
+ result = prime * result + ((manageRoles == null) ? 0 : manageRoles.hashCode());
+ result = prime * result + ((sendRoles == null) ? 0 : sendRoles.hashCode());
+ result = prime * result + (int)(storeId ^ (storeId >>> 32));
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PersistedRoles other = (PersistedRoles)obj;
+ if (addressMatch == null)
+ {
+ if (other.addressMatch != null)
+ return false;
+ }
+ else if (!addressMatch.equals(other.addressMatch))
+ return false;
+ if (consumeRoles == null)
+ {
+ if (other.consumeRoles != null)
+ return false;
+ }
+ else if (!consumeRoles.equals(other.consumeRoles))
+ return false;
+ if (createDurableQueueRoles == null)
+ {
+ if (other.createDurableQueueRoles != null)
+ return false;
+ }
+ else if (!createDurableQueueRoles.equals(other.createDurableQueueRoles))
+ return false;
+ if (createTempQueueRoles == null)
+ {
+ if (other.createTempQueueRoles != null)
+ return false;
+ }
+ else if (!createTempQueueRoles.equals(other.createTempQueueRoles))
+ return false;
+ if (deleteDurableQueueRoles == null)
+ {
+ if (other.deleteDurableQueueRoles != null)
+ return false;
+ }
+ else if (!deleteDurableQueueRoles.equals(other.deleteDurableQueueRoles))
+ return false;
+ if (deleteTempQueueRoles == null)
+ {
+ if (other.deleteTempQueueRoles != null)
+ return false;
+ }
+ else if (!deleteTempQueueRoles.equals(other.deleteTempQueueRoles))
+ return false;
+ if (manageRoles == null)
+ {
+ if (other.manageRoles != null)
+ return false;
+ }
+ else if (!manageRoles.equals(other.manageRoles))
+ return false;
+ if (sendRoles == null)
+ {
+ if (other.sendRoles != null)
+ return false;
+ }
+ else if (!sendRoles.equals(other.sendRoles))
+ return false;
+ if (storeId != other.storeId)
+ return false;
+ return true;
+ }
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/persistence/StorageManager.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/persistence/StorageManager.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -13,6 +13,7 @@
package org.hornetq.core.persistence;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -26,6 +27,8 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistconfig.PersistedRoles;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQComponent;
@@ -160,4 +163,16 @@
void addGrouping(GroupBinding groupBinding) throws Exception;
void deleteGrouping(GroupBinding groupBinding) throws Exception;
+
+ void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception;
+
+ void deleteAddressSetting(SimpleString addressMatch) throws Exception;
+
+ List<PersistedAddressSetting> recoverAddressSettings() throws Exception;
+
+ void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception;
+
+ void deleteSecurityRoles(SimpleString addressMatch) throws Exception;
+
+ List<PersistedRoles> recoverPersistedRoles() throws Exception;
}
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -20,6 +20,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.transaction.xa.Xid;
@@ -49,6 +50,8 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistconfig.PersistedRoles;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
@@ -101,6 +104,10 @@
public static final byte ID_COUNTER_RECORD = 24;
+ public static final byte ADDRESS_SETTING_RECORD = 25;
+
+ public static final byte SECURITY_RECORD = 26;
+
// type + expiration + timestamp + priority
public static final int SIZE_FIELDS = DataConstants.SIZE_INT + DataConstants.SIZE_LONG +
DataConstants.SIZE_LONG +
@@ -160,6 +167,12 @@
private final String journalDir;
private final String largeMessagesDirectory;
+
+
+ // Persisted core configuration
+ private final Map<SimpleString, PersistedRoles> mapPersistedRoles = new ConcurrentHashMap<SimpleString, PersistedRoles>();
+
+ private final Map<SimpleString, PersistedAddressSetting> mapPersistedAddressSettings = new ConcurrentHashMap<SimpleString, PersistedAddressSetting>();
public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory)
{
@@ -690,7 +703,69 @@
getContext(syncNonTransactional));
}
+
+
+ public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
+ {
+ deleteAddressSetting(addressSetting.getAddressMatch());
+ long id = idGenerator.generateID();
+ addressSetting.setStoreId(id);
+ bindingsJournal.appendAddRecord(id, ADDRESS_SETTING_RECORD, addressSetting, true);
+ mapPersistedAddressSettings.put(addressSetting.getAddressMatch(), addressSetting);
+ }
+
+ public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
+ {
+ ArrayList<PersistedAddressSetting> list = new ArrayList<PersistedAddressSetting>(mapPersistedAddressSettings.size());
+ list.addAll(mapPersistedAddressSettings.values());
+ return list;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#recoverPersistedRoles()
+ */
+ public List<PersistedRoles> recoverPersistedRoles() throws Exception
+ {
+ ArrayList<PersistedRoles> list = new ArrayList<PersistedRoles>(mapPersistedRoles.size());
+ list.addAll(mapPersistedRoles.values());
+ return list;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeSecurityRoles(org.hornetq.core.persistconfig.PersistedRoles)
+ */
+ public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception
+ {
+
+ deleteSecurityRoles(persistedRoles.getAddressMatch());
+ long id = idGenerator.generateID();
+ persistedRoles.setStoreId(id);
+ bindingsJournal.appendAddRecord(id, SECURITY_RECORD, persistedRoles, true);
+ mapPersistedRoles.put(persistedRoles.getAddressMatch(), persistedRoles);
+ }
+
+ public void deleteAddressSetting(SimpleString addressMatch) throws Exception
+ {
+ PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch);
+ if (oldSetting != null)
+ {
+ bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false);
+ }
+
+ }
+
+ public void deleteSecurityRoles(SimpleString addressMatch) throws Exception
+ {
+ PersistedRoles oldRoles = mapPersistedRoles.remove(addressMatch);
+ if (oldRoles != null)
+ {
+ bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false);
+ }
+ }
+
+
+
public JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
final PagingManager pagingManager,
final ResourceManager resourceManager,
@@ -1028,6 +1103,20 @@
encoding.setId(id);
groupingInfos.add(encoding);
}
+ else if (rec == JournalStorageManager.ADDRESS_SETTING_RECORD)
+ {
+ PersistedAddressSetting setting = new PersistedAddressSetting();
+ setting.decode(buffer);
+ setting.setStoreId(id);
+ mapPersistedAddressSettings.put(setting.getAddressMatch(), setting);
+ }
+ else if (rec == JournalStorageManager.SECURITY_RECORD)
+ {
+ PersistedRoles roles = new PersistedRoles();
+ roles.decode(buffer);
+ roles.setStoreId(id);
+ mapPersistedRoles.put(roles.getAddressMatch(), roles);
+ }
else
{
throw new IllegalStateException("Invalid record type " + rec);
@@ -2051,5 +2140,4 @@
}
-
}
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -13,6 +13,8 @@
package org.hornetq.core.persistence.impl.nullpm;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -29,6 +31,8 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistconfig.PersistedRoles;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
@@ -388,4 +392,48 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#recoverAddressSettings()
+ */
+ public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
+ {
+ return Collections.emptyList();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeAddressSetting(org.hornetq.core.persistconfig.PersistedAddressSetting)
+ */
+ public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#recoverPersistedRoles()
+ */
+ public List<PersistedRoles> recoverPersistedRoles() throws Exception
+ {
+ return Collections.emptyList();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeSecurityRoles(org.hornetq.core.persistconfig.PersistedRoles)
+ */
+ public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteAddressSetting(org.hornetq.api.core.SimpleString)
+ */
+ public void deleteAddressSetting(SimpleString addressMatch) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteSecurityRoles(org.hornetq.api.core.SimpleString)
+ */
+ public void deleteSecurityRoles(SimpleString addressMatch) throws Exception
+ {
+ }
+
}
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -76,11 +76,9 @@
private final HornetQServer server;
private Channel channel;
+
+ private Journal[] journals;
- private Journal bindingsJournal;
-
- private Journal messagingJournal;
-
private JournalStorageManager storage;
private PagingManager pageManager;
@@ -101,6 +99,26 @@
}
// Public --------------------------------------------------------
+
+ public void registerJournal(final byte id, final Journal journal)
+ {
+ if (journals == null || id >= journals.length)
+ {
+ Journal[] oldJournals = journals;
+ journals = new Journal[id + 1];
+
+ if (oldJournals != null)
+ {
+ for (int i = 0 ; i < oldJournals.length; i++)
+ {
+ journals[i] = oldJournals[i];
+ }
+ }
+ }
+
+ journals[id] = journal;
+ }
+
/*
* (non-Javadoc)
* @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
@@ -194,8 +212,8 @@
server.getManagementService().setStorageManager(storage);
- bindingsJournal = storage.getBindingsJournal();
- messagingJournal = storage.getMessageJournal();
+ registerJournal((byte)1, storage.getMessageJournal());
+ registerJournal((byte)0, storage.getBindingsJournal());
// We only need to load internal structures on the backup...
journalLoadInformation = storage.loadInternalOnly();
@@ -591,16 +609,7 @@
*/
private Journal getJournal(final byte journalID)
{
- Journal journalToUse;
- if (journalID == (byte)0)
- {
- journalToUse = bindingsJournal;
- }
- else
- {
- journalToUse = messagingJournal;
- }
- return journalToUse;
+ return this.journals[journalID];
}
// Inner classes -------------------------------------------------
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -59,6 +59,8 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistconfig.PersistedRoles;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
@@ -111,6 +113,7 @@
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.SecurityFormatter;
import org.hornetq.utils.UUID;
import org.hornetq.utils.UUIDGenerator;
import org.hornetq.utils.VersionLoader;
@@ -258,7 +261,7 @@
addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
addressSettingsRepository.setDefault(new AddressSettings());
-
+
securityRepository = new HierarchicalObjectRepository<Set<Role>>();
securityRepository.setDefault(new HashSet<Role>());
@@ -434,9 +437,9 @@
{
memoryManager.stop();
}
-
+
addressSettingsRepository.clear();
-
+
securityRepository.clear();
pagingManager = null;
@@ -554,7 +557,7 @@
final boolean xa,
final SessionCallback callback) throws Exception
{
-
+
if (securityStore != null)
{
securityStore.authenticate(username, password);
@@ -692,7 +695,11 @@
if (queue.getConsumerCount() != 0)
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot delete queue " + queue.getName() + " on binding " + queueName + " - it has consumers = " + binding.getClass().getName());
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot delete queue " + queue.getName() +
+ " on binding " +
+ queueName +
+ " - it has consumers = " +
+ binding.getClass().getName());
}
if (session != null)
@@ -975,7 +982,7 @@
configuration.isBackup());
// Address settings need to deployed initially, since they're require on paging manager.start()
-
+
deployAddressSettingsFromConfiguration();
if (configuration.isFileDeploymentEnabled())
@@ -1119,7 +1126,6 @@
}
}
-
private JournalLoadInformation[] loadJournals() throws Exception
{
JournalLoadInformation[] journalInfo = new JournalLoadInformation[2];
@@ -1130,6 +1136,8 @@
journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
+ recoverStoredConfigs();
+
// Set the node id - must be before we load the queues into the postoffice, but after we load the journal
setNodeID();
@@ -1189,6 +1197,33 @@
return journalInfo;
}
+ /**
+ * @throws Exception
+ */
+ private void recoverStoredConfigs() throws Exception
+ {
+ List<PersistedAddressSetting> adsettings = storageManager.recoverAddressSettings();
+ for (PersistedAddressSetting set : adsettings)
+ {
+ addressSettingsRepository.addMatch(set.getAddressMatch().toString(), set.getSetting());
+ }
+
+ List<PersistedRoles> roles = storageManager.recoverPersistedRoles();
+
+ for (PersistedRoles roleItem : roles)
+ {
+ Set<Role> setRoles = SecurityFormatter.createSecurity(roleItem.getSendRoles(),
+ roleItem.getConsumeRoles(),
+ roleItem.getCreateDurableQueueRoles(),
+ roleItem.getDeleteDurableQueueRoles(),
+ roleItem.getCreateTempQueueRoles(),
+ roleItem.getDeleteTempQueueRoles(),
+ roleItem.getManageRoles());
+
+ securityRepository.addMatch(roleItem.getAddressMatch().toString(), setRoles);
+ }
+ }
+
private void setNodeID() throws Exception
{
if (!configuration.isBackup())
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/settings/impl/AddressSettings.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -15,9 +15,13 @@
import java.io.Serializable;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.settings.Mergeable;
+import org.hornetq.utils.BufferHelper;
+import org.hornetq.utils.DataConstants;
/**
* Configuration settings that are applied on the address level
@@ -25,7 +29,7 @@
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
* @author <a href="tim.fox(a)jboss.com">Tim Fox</a>
*/
-public class AddressSettings implements Mergeable<AddressSettings>, Serializable
+public class AddressSettings implements Mergeable<AddressSettings>, Serializable, EncodingSupport
{
private static final long serialVersionUID = 1607502280582336366L;
@@ -240,4 +244,250 @@
}
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ SimpleString policyStr = buffer.readNullableSimpleString();
+
+ if (policyStr != null)
+ {
+ addressFullMessagePolicy = AddressFullMessagePolicy.valueOf(policyStr.toString());
+ }
+ else
+ {
+ addressFullMessagePolicy = null;
+ }
+
+ maxSizeBytes = BufferHelper.readNullableLong(buffer);
+
+ pageSizeBytes = BufferHelper.readNullableInteger(buffer);
+
+ dropMessagesWhenFull = BufferHelper.readNullableBoolean(buffer);
+
+ maxDeliveryAttempts = BufferHelper.readNullableInteger(buffer);
+
+ messageCounterHistoryDayLimit = BufferHelper.readNullableInteger(buffer);
+
+ redeliveryDelay = BufferHelper.readNullableLong(buffer);
+
+ deadLetterAddress = buffer.readNullableSimpleString();
+
+ expiryAddress = buffer.readNullableSimpleString();
+
+ lastValueQueue = BufferHelper.readNullableBoolean(buffer);
+
+ redeliveryDelay = BufferHelper.readNullableLong(buffer);
+
+ sendToDLAOnNoRoute = BufferHelper.readNullableBoolean(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+
+ return BufferHelper.sizeOfNullableSimpleString(addressFullMessagePolicy != null ? addressFullMessagePolicy.toString()
+ : null) + BufferHelper.sizeOfNullableLong(maxSizeBytes) +
+ BufferHelper.sizeOfNullableInteger(pageSizeBytes) +
+ BufferHelper.sizeOfNullableBoolean(dropMessagesWhenFull) +
+ BufferHelper.sizeOfNullableInteger(maxDeliveryAttempts) +
+ BufferHelper.sizeOfNullableInteger(messageCounterHistoryDayLimit) +
+ BufferHelper.sizeOfNullableLong(redeliveryDelay) +
+ SimpleString.sizeofNullableString(deadLetterAddress) +
+ SimpleString.sizeofNullableString(expiryAddress) +
+ BufferHelper.sizeOfNullableBoolean(lastValueQueue) +
+ BufferHelper.sizeOfNullableLong(redistributionDelay) +
+ BufferHelper.sizeOfNullableBoolean(sendToDLAOnNoRoute);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeNullableSimpleString(addressFullMessagePolicy != null ? new SimpleString(addressFullMessagePolicy.toString())
+ : null);
+
+ BufferHelper.writeNullableLong(buffer, maxSizeBytes);
+
+ BufferHelper.writeNullableInteger(buffer, pageSizeBytes);
+
+ BufferHelper.writeNullableBoolean(buffer, dropMessagesWhenFull);
+
+ BufferHelper.writeNullableInteger(buffer, maxDeliveryAttempts);
+
+ BufferHelper.writeNullableInteger(buffer, messageCounterHistoryDayLimit);
+
+ BufferHelper.writeNullableLong(buffer, redeliveryDelay);
+
+ buffer.writeNullableSimpleString(deadLetterAddress);
+
+ buffer.writeNullableSimpleString(expiryAddress);
+
+ BufferHelper.writeNullableBoolean(buffer, lastValueQueue);
+
+ BufferHelper.writeNullableLong(buffer, redistributionDelay);
+
+ BufferHelper.writeNullableBoolean(buffer, sendToDLAOnNoRoute);
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((addressFullMessagePolicy == null) ? 0 : addressFullMessagePolicy.hashCode());
+ result = prime * result + ((deadLetterAddress == null) ? 0 : deadLetterAddress.hashCode());
+ result = prime * result + ((dropMessagesWhenFull == null) ? 0 : dropMessagesWhenFull.hashCode());
+ result = prime * result + ((expiryAddress == null) ? 0 : expiryAddress.hashCode());
+ result = prime * result + ((lastValueQueue == null) ? 0 : lastValueQueue.hashCode());
+ result = prime * result + ((maxDeliveryAttempts == null) ? 0 : maxDeliveryAttempts.hashCode());
+ result = prime * result + ((maxSizeBytes == null) ? 0 : maxSizeBytes.hashCode());
+ result = prime * result +
+ ((messageCounterHistoryDayLimit == null) ? 0 : messageCounterHistoryDayLimit.hashCode());
+ result = prime * result + ((pageSizeBytes == null) ? 0 : pageSizeBytes.hashCode());
+ result = prime * result + ((redeliveryDelay == null) ? 0 : redeliveryDelay.hashCode());
+ result = prime * result + ((redistributionDelay == null) ? 0 : redistributionDelay.hashCode());
+ result = prime * result + ((sendToDLAOnNoRoute == null) ? 0 : sendToDLAOnNoRoute.hashCode());
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ AddressSettings other = (AddressSettings)obj;
+ if (addressFullMessagePolicy == null)
+ {
+ if (other.addressFullMessagePolicy != null)
+ return false;
+ }
+ else if (!addressFullMessagePolicy.equals(other.addressFullMessagePolicy))
+ return false;
+ if (deadLetterAddress == null)
+ {
+ if (other.deadLetterAddress != null)
+ return false;
+ }
+ else if (!deadLetterAddress.equals(other.deadLetterAddress))
+ return false;
+ if (dropMessagesWhenFull == null)
+ {
+ if (other.dropMessagesWhenFull != null)
+ return false;
+ }
+ else if (!dropMessagesWhenFull.equals(other.dropMessagesWhenFull))
+ return false;
+ if (expiryAddress == null)
+ {
+ if (other.expiryAddress != null)
+ return false;
+ }
+ else if (!expiryAddress.equals(other.expiryAddress))
+ return false;
+ if (lastValueQueue == null)
+ {
+ if (other.lastValueQueue != null)
+ return false;
+ }
+ else if (!lastValueQueue.equals(other.lastValueQueue))
+ return false;
+ if (maxDeliveryAttempts == null)
+ {
+ if (other.maxDeliveryAttempts != null)
+ return false;
+ }
+ else if (!maxDeliveryAttempts.equals(other.maxDeliveryAttempts))
+ return false;
+ if (maxSizeBytes == null)
+ {
+ if (other.maxSizeBytes != null)
+ return false;
+ }
+ else if (!maxSizeBytes.equals(other.maxSizeBytes))
+ return false;
+ if (messageCounterHistoryDayLimit == null)
+ {
+ if (other.messageCounterHistoryDayLimit != null)
+ return false;
+ }
+ else if (!messageCounterHistoryDayLimit.equals(other.messageCounterHistoryDayLimit))
+ return false;
+ if (pageSizeBytes == null)
+ {
+ if (other.pageSizeBytes != null)
+ return false;
+ }
+ else if (!pageSizeBytes.equals(other.pageSizeBytes))
+ return false;
+ if (redeliveryDelay == null)
+ {
+ if (other.redeliveryDelay != null)
+ return false;
+ }
+ else if (!redeliveryDelay.equals(other.redeliveryDelay))
+ return false;
+ if (redistributionDelay == null)
+ {
+ if (other.redistributionDelay != null)
+ return false;
+ }
+ else if (!redistributionDelay.equals(other.redistributionDelay))
+ return false;
+ if (sendToDLAOnNoRoute == null)
+ {
+ if (other.sendToDLAOnNoRoute != null)
+ return false;
+ }
+ else if (!sendToDLAOnNoRoute.equals(other.sendToDLAOnNoRoute))
+ return false;
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "AddressSettings [addressFullMessagePolicy=" + addressFullMessagePolicy +
+ ", deadLetterAddress=" +
+ deadLetterAddress +
+ ", dropMessagesWhenFull=" +
+ dropMessagesWhenFull +
+ ", expiryAddress=" +
+ expiryAddress +
+ ", lastValueQueue=" +
+ lastValueQueue +
+ ", maxDeliveryAttempts=" +
+ maxDeliveryAttempts +
+ ", maxSizeBytes=" +
+ maxSizeBytes +
+ ", messageCounterHistoryDayLimit=" +
+ messageCounterHistoryDayLimit +
+ ", pageSizeBytes=" +
+ pageSizeBytes +
+ ", redeliveryDelay=" +
+ redeliveryDelay +
+ ", redistributionDelay=" +
+ redistributionDelay +
+ ", sendToDLAOnNoRoute=" +
+ sendToDLAOnNoRoute +
+ "]";
+ }
+
}
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.persistence;
+
+import java.util.List;
+
+import org.hornetq.core.server.HornetQComponent;
+
+/**
+ * A JMSPersistence
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface JMSStorageManager extends HornetQComponent
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ void storeDestination(PersistedDestination destination);
+
+ List<PersistedDestination> recoverDestinations();
+
+ void deleteConnectionFactory(String connectionFactory) throws Exception;
+
+ void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws Exception;
+
+ List<PersistedConnectionFactory> recoverConnectionFactories();
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedConnectionFactory.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedConnectionFactory.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedConnectionFactory.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.persistence;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
+
+/**
+ * A PersistedConnectionFactory
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PersistedConnectionFactory implements EncodingSupport
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long id;
+
+ private ConnectionFactoryConfiguration config;
+
+ public PersistedConnectionFactory()
+ {
+ super();
+ }
+
+ /**
+ * @param id
+ * @param config
+ */
+ public PersistedConnectionFactory(final ConnectionFactoryConfiguration config)
+ {
+ super();
+ this.config = config;
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ /**
+ * @return the id
+ */
+ public long getId()
+ {
+ return id;
+ }
+
+ public void setId(final long id)
+ {
+ this.id = id;
+ }
+
+ public String getName()
+ {
+ return config.getName();
+ }
+
+ /**
+ * @return the config
+ */
+ public ConnectionFactoryConfiguration getConfig()
+ {
+ return config;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(final HornetQBuffer buffer)
+ {
+ config = new ConnectionFactoryConfigurationImpl();
+ config.decode(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ config.encode(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return config.getEncodeSize();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.persistence;
+
+/**
+ * A PersistedDestination
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PersistedDestination
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.persistence.impl.journal;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.replication.impl.ReplicatedJournal;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.jms.persistence.JMSStorageManager;
+import org.hornetq.jms.persistence.PersistedConnectionFactory;
+import org.hornetq.jms.persistence.PersistedDestination;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.utils.IDGenerator;
+
+/**
+ * A JournalJMSStorageManagerImpl
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalJMSStorageManagerImpl implements JMSStorageManager
+{
+
+ // Constants -----------------------------------------------------
+
+ private final byte CF_RECORD = 1;
+
+ // Attributes ----------------------------------------------------
+
+ private final IDGenerator idGenerator;
+
+ private final String bindingsDir;
+
+ private final boolean createBindingsDir;
+
+ private final Journal jmsJournal;
+
+ private volatile boolean started;
+
+ private Map<String, PersistedConnectionFactory> mapFactories = new ConcurrentHashMap<String, PersistedConnectionFactory>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+ public JournalJMSStorageManagerImpl(final IDGenerator idGenerator,
+ final Configuration config,
+ final ReplicationManager replicator)
+ {
+ if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO)
+ {
+ throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals");
+ }
+
+ bindingsDir = config.getBindingsDirectory();
+
+ if (bindingsDir == null)
+ {
+ throw new NullPointerException("bindings-dir is null");
+ }
+
+ createBindingsDir = config.isCreateBindingsDir();
+
+ SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(bindingsDir);
+
+ Journal localJMS = new JournalImpl(1024 * 1024,
+ 2,
+ config.getJournalCompactMinFiles(),
+ config.getJournalCompactPercentage(),
+ bindingsJMS,
+ "hornetq-jms-config",
+ "jms",
+ 1);
+
+ if (replicator != null)
+ {
+ jmsJournal = new ReplicatedJournal((byte)2, localJMS, replicator);
+ }
+ else
+ {
+ jmsJournal = localJMS;
+ }
+
+ this.idGenerator = idGenerator;
+ }
+
+
+ // Public --------------------------------------------------------
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#recoverConnectionFactories()
+ */
+ public List<PersistedConnectionFactory> recoverConnectionFactories()
+ {
+ List<PersistedConnectionFactory> cfs = new ArrayList<PersistedConnectionFactory>(mapFactories.size());
+ cfs.addAll(mapFactories.values());
+ return cfs;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#storeConnectionFactory(org.hornetq.jms.persistence.PersistedConnectionFactory)
+ */
+ public void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws Exception
+ {
+ deleteConnectionFactory(connectionFactory.getName());
+ long id = idGenerator.generateID();
+ connectionFactory.setId(id);
+ jmsJournal.appendAddRecord(id, CF_RECORD, connectionFactory, true);
+ mapFactories.put(connectionFactory.getName(), connectionFactory);
+ }
+
+ public void deleteConnectionFactory(String cfName) throws Exception
+ {
+ PersistedConnectionFactory oldCF = mapFactories.remove(cfName);
+ if (oldCF != null)
+ {
+ jmsJournal.appendDeleteRecord(oldCF.getId(), false);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#recoverDestinations()
+ */
+ public List<PersistedDestination> recoverDestinations()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#storeDestination(org.hornetq.jms.persistence.PersistedDestination)
+ */
+ public void storeDestination(PersistedDestination destination)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ */
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#start()
+ */
+ public void start() throws Exception
+ {
+
+ checkAndCreateDir(bindingsDir, createBindingsDir);
+
+ jmsJournal.start();
+
+ load();
+
+ started = true;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#stop()
+ */
+ public void stop() throws Exception
+ {
+ jmsJournal.stop();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void load() throws Exception
+ {
+ mapFactories.clear();
+
+ List<RecordInfo> data = new ArrayList<RecordInfo>();
+
+ ArrayList<PreparedTransactionInfo> list = new ArrayList<PreparedTransactionInfo>();
+
+ jmsJournal.load(data, list, null);
+
+ for (RecordInfo record : data)
+ {
+ long id = record.id;
+
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(record.data);
+
+ byte rec = record.getUserRecordType();
+
+ if (rec == CF_RECORD)
+ {
+ PersistedConnectionFactory cf = new PersistedConnectionFactory();
+ cf.decode(buffer);
+ cf.setId(id);
+ mapFactories.put(cf.getName(), cf);
+ }
+ else
+ {
+ throw new IllegalStateException("Invalid record type " + rec);
+ }
+
+ }
+
+ }
+
+ private void checkAndCreateDir(final String dir, final boolean create)
+ {
+ File f = new File(dir);
+
+ if (!f.exists())
+ {
+ if (create)
+ {
+ if (!f.mkdirs())
+ {
+ throw new IllegalStateException("Failed to create directory " + dir);
+ }
+ }
+ else
+ {
+ throw new IllegalArgumentException("Directory " + dir + " does not exist and will not create it");
+ }
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.persistence.impl.nullpm;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.hornetq.jms.persistence.JMSStorageManager;
+import org.hornetq.jms.persistence.PersistedConnectionFactory;
+import org.hornetq.jms.persistence.PersistedDestination;
+
+/**
+ * A NullJMSStorageManagerImpl
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class NullJMSStorageManagerImpl implements JMSStorageManager
+{
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#deleteConnectionFactory(java.lang.String)
+ */
+ public void deleteConnectionFactory(String connectionFactory) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#recoverConnectionFactories()
+ */
+ public List<PersistedConnectionFactory> recoverConnectionFactories()
+ {
+ return Collections.emptyList();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#recoverDestinations()
+ */
+ public List<PersistedDestination> recoverDestinations()
+ {
+ return Collections.emptyList();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#storeConnectionFactory(org.hornetq.jms.persistence.PersistedConnectionFactory)
+ */
+ public void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#storeDestination(org.hornetq.jms.persistence.PersistedDestination)
+ */
+ public void storeDestination(PersistedDestination destination)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ */
+ public boolean isStarted()
+ {
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#start()
+ */
+ public void start() throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#stop()
+ */
+ public void stop() throws Exception
+ {
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -17,6 +17,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.jms.server.JMSServerManager;
/**
@@ -26,7 +27,7 @@
*
*
*/
-public interface ConnectionFactoryConfiguration
+public interface ConnectionFactoryConfiguration extends EncodingSupport
{
String getName();
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -14,12 +14,16 @@
package org.hornetq.jms.server.config.impl;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.utils.BufferHelper;
+import org.hornetq.utils.DataConstants;
/**
* A ConnectionFactoryConfigurationImpl
@@ -34,10 +38,10 @@
// Attributes ----------------------------------------------------
- private final String[] bindings;
+ private String name;
- private final String name;
-
+ private String[] bindings;
+
private String discoveryGroupName;
private String discoveryAddress;
@@ -45,7 +49,7 @@
private int discoveryPort;
private List<Pair<String, String>> connectorNames;
-
+
private List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs;
private String clientID = null;
@@ -112,6 +116,11 @@
// Constructors --------------------------------------------------
+ /** To be used on persistence only */
+ public ConnectionFactoryConfigurationImpl()
+ {
+ }
+
public ConnectionFactoryConfigurationImpl(final String name,
final String discoveryAddress,
final int discoveryPort,
@@ -527,9 +536,306 @@
public void setDiscoveryGroupName(String groupName)
{
this.discoveryGroupName = groupName;
+
+ }
+
+ // Encoding Support Implementation --------------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ name = buffer.readSimpleString().toString();
+
+ int nbindings = buffer.readInt();
+
+ bindings = new String[nbindings];
+
+ for (int i = 0; i < nbindings; i++)
+ {
+ bindings[i] = buffer.readSimpleString().toString();
+ }
+
+ discoveryGroupName = BufferHelper.readNullableSimpleStringAsString(buffer);
+
+ discoveryAddress = BufferHelper.readNullableSimpleStringAsString(buffer);
+
+ discoveryPort = buffer.readInt();
+
+ int nConnectors = buffer.readInt();
+
+ connectorNames = new ArrayList<Pair<String, String>>(nConnectors);
+
+ for (int i = 0; i < nConnectors; i++)
+ {
+ String a = BufferHelper.readNullableSimpleStringAsString(buffer);
+
+ String b = BufferHelper.readNullableSimpleStringAsString(buffer);
+
+ connectorNames.add(new Pair<String, String>(a, b));
+ }
+
+ clientID = BufferHelper.readNullableSimpleStringAsString(buffer);
+
+ discoveryRefreshTimeout = buffer.readLong();
+
+ clientFailureCheckPeriod = buffer.readLong();
+
+ connectionTTL = buffer.readLong();
+
+ callTimeout = buffer.readLong();
+
+ cacheLargeMessagesClient = buffer.readBoolean();
+
+ minLargeMessageSize = buffer.readInt();
+
+ consumerWindowSize = buffer.readInt();
+
+ consumerMaxRate = buffer.readInt();
+
+ confirmationWindowSize = buffer.readInt();
+
+ producerWindowSize = buffer.readInt();
+
+ producerMaxRate = buffer.readInt();
+
+ blockOnAcknowledge = buffer.readBoolean();
+
+ blockOnDurableSend = buffer.readBoolean();
+
+ blockOnNonDurableSend = buffer.readBoolean();
+
+ autoGroup = buffer.readBoolean();
+
+ preAcknowledge = buffer.readBoolean();
+
+ loadBalancingPolicyClassName = buffer.readSimpleString().toString();
+
+ transactionBatchSize = buffer.readInt();
+
+ dupsOKBatchSize = buffer.readInt();
+
+ initialWaitTimeout = buffer.readLong();
+
+ useGlobalPools = buffer.readBoolean();
+
+ scheduledThreadPoolMaxSize = buffer.readInt();
+
+ threadPoolMaxSize = buffer.readInt();
+
+ retryInterval = buffer.readLong();
+
+ retryIntervalMultiplier = buffer.readDouble();
+
+ maxRetryInterval = buffer.readLong();
+
+ reconnectAttempts = buffer.readInt();
+
+ failoverOnServerShutdown = buffer.readBoolean();
+
+ groupID = BufferHelper.readNullableSimpleStringAsString(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ BufferHelper.writeAsSimpleString(buffer, name);
+
+ buffer.writeInt(bindings.length);
+
+ for (String str : bindings)
+ {
+ BufferHelper.writeAsSimpleString(buffer, str);
+ }
+
+ BufferHelper.writeAsNullableSimpleString(buffer, discoveryGroupName);
+
+ BufferHelper.writeAsNullableSimpleString(buffer, discoveryAddress);
+
+ buffer.writeInt(discoveryPort);
+
+ buffer.writeInt(connectorNames == null ? 0 : connectorNames.size());
+
+ if (connectorNames != null)
+ {
+ for (Pair<String, String> namePair : connectorNames)
+ {
+ BufferHelper.writeAsNullableSimpleString(buffer, namePair.a);
+ BufferHelper.writeAsNullableSimpleString(buffer, namePair.b);
+ }
+ }
+
+ BufferHelper.writeAsNullableSimpleString(buffer, clientID);
+
+ buffer.writeLong(discoveryRefreshTimeout);
+
+ buffer.writeLong(clientFailureCheckPeriod);
+
+ buffer.writeLong(connectionTTL);
+
+ buffer.writeLong(callTimeout);
+
+ buffer.writeBoolean(cacheLargeMessagesClient);
+
+ buffer.writeInt(minLargeMessageSize);
+
+ buffer.writeInt(consumerWindowSize);
+
+ buffer.writeInt(consumerMaxRate);
+
+ buffer.writeInt(confirmationWindowSize);
+
+ buffer.writeInt(producerWindowSize);
+
+ buffer.writeInt(producerMaxRate);
+
+ buffer.writeBoolean(blockOnAcknowledge);
+
+ buffer.writeBoolean(blockOnDurableSend);
+
+ buffer.writeBoolean(blockOnNonDurableSend);
+
+ buffer.writeBoolean(autoGroup);
+
+ buffer.writeBoolean(preAcknowledge);
+
+ BufferHelper.writeAsSimpleString(buffer, loadBalancingPolicyClassName);
+
+ buffer.writeInt(transactionBatchSize);
+
+ buffer.writeInt(dupsOKBatchSize);
+
+ buffer.writeLong(initialWaitTimeout);
+
+ buffer.writeBoolean(useGlobalPools);
+
+ buffer.writeInt(scheduledThreadPoolMaxSize);
+
+ buffer.writeInt(threadPoolMaxSize);
+
+ buffer.writeLong(retryInterval);
+
+ buffer.writeDouble(retryIntervalMultiplier);
+
+ buffer.writeLong(maxRetryInterval);
+
+ buffer.writeInt(reconnectAttempts);
+
+ buffer.writeBoolean(failoverOnServerShutdown);
+
+ BufferHelper.writeAsNullableSimpleString(buffer, groupID);
+ }
+
+ private int sizeOfBindings()
+ {
+ int size = DataConstants.SIZE_INT; // for the number of bindings persisted
+
+ for (String str : bindings)
+ {
+ size += BufferHelper.sizeOfSimpleString(str);
+ }
+
+ return size;
+
+ }
+
+ private int sizeOfConnectors()
+ {
+ int size = DataConstants.SIZE_INT; // for the number of connectors persisted
+
+ if (connectorNames != null)
+ {
+ for (Pair<String, String> pair : connectorNames)
+ {
+ size += BufferHelper.sizeOfNullableSimpleString(pair.a);
+ size += BufferHelper.sizeOfNullableSimpleString(pair.b);
+ }
+ }
+
+ return size;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return BufferHelper.sizeOfSimpleString(name) +
+ sizeOfBindings() +
+
+ BufferHelper.sizeOfNullableSimpleString(discoveryGroupName) +
+
+ BufferHelper.sizeOfNullableSimpleString(discoveryAddress)+
+
+ DataConstants.SIZE_INT + // discoveryPort
+
+ sizeOfConnectors() +
+
+ BufferHelper.sizeOfNullableSimpleString(clientID) +
+
+ DataConstants.SIZE_LONG + // discoveryRefreshTimeout
+
+ DataConstants.SIZE_LONG + // clientFailureCheckPeriod
+
+ DataConstants.SIZE_LONG + // connectionTTL
+
+ DataConstants.SIZE_LONG + // callTimeout
+
+ DataConstants.SIZE_BOOLEAN + // cacheLargeMessagesClient
+
+ DataConstants.SIZE_INT + // minLargeMessageSize
+
+ DataConstants.SIZE_INT + // consumerWindowSize
+
+ DataConstants.SIZE_INT + // consumerMaxRate
+
+ DataConstants.SIZE_INT + // confirmationWindowSize
+
+ DataConstants.SIZE_INT + // producerWindowSize
+
+ DataConstants.SIZE_INT + // producerMaxRate
+
+ DataConstants.SIZE_BOOLEAN + // blockOnAcknowledge
+
+ DataConstants.SIZE_BOOLEAN + // blockOnDurableSend
+
+ DataConstants.SIZE_BOOLEAN + // blockOnNonDurableSend
+
+ DataConstants.SIZE_BOOLEAN + // autoGroup
+
+ DataConstants.SIZE_BOOLEAN + // preAcknowledge
+
+ BufferHelper.sizeOfSimpleString(loadBalancingPolicyClassName) +
+
+ DataConstants.SIZE_INT + // transactionBatchSize
+
+ DataConstants.SIZE_INT + // dupsOKBatchSize
+
+ DataConstants.SIZE_LONG + // initialWaitTimeout
+
+ DataConstants.SIZE_BOOLEAN + // useGlobalPools
+
+ DataConstants.SIZE_INT + // scheduledThreadPoolMaxSize
+
+ DataConstants.SIZE_INT + // threadPoolMaxSize
+
+ DataConstants.SIZE_LONG + // retryInterval
+
+ DataConstants.SIZE_DOUBLE + // retryIntervalMultiplier
+
+ DataConstants.SIZE_LONG + // maxRetryInterval
+
+ DataConstants.SIZE_INT + // reconnectAttempts
+
+ DataConstants.SIZE_BOOLEAN + // failoverOnServerShutdown
+
+ BufferHelper.sizeOfNullableSimpleString(groupID);
}
-
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -42,6 +42,10 @@
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.SelectorTranslator;
+import org.hornetq.jms.persistence.JMSStorageManager;
+import org.hornetq.jms.persistence.PersistedConnectionFactory;
+import org.hornetq.jms.persistence.impl.journal.JournalJMSStorageManagerImpl;
+import org.hornetq.jms.persistence.impl.nullpm.NullJMSStorageManagerImpl;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.JMSConfiguration;
@@ -49,6 +53,7 @@
import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.management.JMSManagementService;
import org.hornetq.jms.server.management.impl.JMSManagementServiceImpl;
+import org.hornetq.utils.TimeAndCounterIDGenerator;
/**
* A Deployer used to create and add to JNDI queues, topics and connection
@@ -100,11 +105,15 @@
private boolean contextSet;
private JMSConfiguration config;
+
+ private Configuration coreConfig;
+
+ private JMSStorageManager storage;
public JMSServerManagerImpl(final HornetQServer server) throws Exception
{
this.server = server;
-
+
configFileName = null;
}
@@ -157,6 +166,8 @@
{
deploy();
}
+
+ initJournal();
}
catch (Exception e)
{
@@ -601,7 +612,17 @@
public synchronized void createConnectionFactory(final ConnectionFactoryConfiguration cfConfig) throws Exception
{
+ internalCreateCF(cfConfig);
+ storage.storeConnectionFactory(new PersistedConnectionFactory(cfConfig));
+ }
+ /**
+ * @param cfConfig
+ * @throws HornetQException
+ * @throws Exception
+ */
+ private void internalCreateCF(final ConnectionFactoryConfiguration cfConfig) throws HornetQException, Exception
+ {
ArrayList<String> listBindings = new ArrayList<String>();
for (String str : cfConfig.getBindings())
{
@@ -683,7 +704,6 @@
cfConfig.getGroupID(),
listBindings);
}
-
}
public synchronized void createConnectionFactory(final String name,
@@ -950,6 +970,35 @@
}
/**
+ * @param server
+ */
+ private void initJournal() throws Exception
+ {
+ this.coreConfig = server.getConfiguration();
+
+ if (coreConfig.isPersistenceEnabled())
+ {
+ // TODO: replication
+ storage = new JournalJMSStorageManagerImpl(new TimeAndCounterIDGenerator(), coreConfig, null);
+ }
+ else
+ {
+ storage = new NullJMSStorageManagerImpl();
+ }
+
+ storage.start();
+
+
+ List<PersistedConnectionFactory> cfs = storage.recoverConnectionFactories();
+
+ for (PersistedConnectionFactory cf : cfs)
+ {
+ internalCreateCF(cf.getConfig());
+ }
+ }
+
+
+ /**
* @param cfConfig
* @return
* @throws HornetQException
Added: branches/Clebert_TMP/src/main/org/hornetq/utils/BufferHelper.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/utils/BufferHelper.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/utils/BufferHelper.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+
+/**
+ * Helper methods to read and write from HornetQBuffer.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class BufferHelper
+{
+
+ /** Size of a String as if it was a Nullable Simple String */
+ public static int sizeOfNullableSimpleString(String str)
+ {
+ if (str == null)
+ {
+ return DataConstants.SIZE_BOOLEAN;
+ }
+ else
+ {
+ return DataConstants.SIZE_BOOLEAN + sizeOfSimpleString(str);
+ }
+ }
+
+ /** Size of a String as it if was a Simple String*/
+ public static int sizeOfSimpleString(String str)
+ {
+ return DataConstants.SIZE_INT + str.length() * 2;
+ }
+
+ public static void writeAsNullableSimpleString(HornetQBuffer buffer, String str)
+ {
+ buffer.writeNullableSimpleString(SimpleString.toSimpleString(str));
+ }
+
+ public static String readNullableSimpleStringAsString(HornetQBuffer buffer)
+ {
+ SimpleString str = buffer.readNullableSimpleString();
+ return str != null ? str.toString() : null;
+ }
+
+ public static void writeAsSimpleString(HornetQBuffer buffer, String str)
+ {
+ buffer.writeSimpleString(new SimpleString(str));
+ }
+
+ /**
+ * @param buffer
+ */
+ public static void writeNullableBoolean(HornetQBuffer buffer, Boolean value)
+ {
+ buffer.writeBoolean(value != null);
+
+ if (value != null)
+ {
+ buffer.writeBoolean(value.booleanValue());
+ }
+ }
+
+ public static int sizeOfNullableBoolean(Boolean value)
+ {
+ return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_BOOLEAN : 0);
+ }
+
+ public static Boolean readNullableBoolean(HornetQBuffer buffer)
+ {
+ boolean isNotNull = buffer.readBoolean();
+
+ if (isNotNull)
+ {
+ return buffer.readBoolean();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /**
+ * @param buffer
+ */
+ public static void writeNullableLong(HornetQBuffer buffer, Long value)
+ {
+ buffer.writeBoolean(value != null);
+
+ if (value != null)
+ {
+ buffer.writeLong(value.longValue());
+ }
+ }
+
+ public static int sizeOfNullableLong(Long value)
+ {
+ return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_LONG : 0);
+ }
+
+ public static Long readNullableLong(HornetQBuffer buffer)
+ {
+ boolean isNotNull = buffer.readBoolean();
+
+ if (isNotNull)
+ {
+ return buffer.readLong();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /**
+ * @param buffer
+ */
+ public static void writeNullableInteger(HornetQBuffer buffer, Integer value)
+ {
+ buffer.writeBoolean(value != null);
+
+ if (value != null)
+ {
+ buffer.writeInt(value.intValue());
+ }
+ }
+
+ public static int sizeOfNullableInteger(Integer value)
+ {
+ return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_INT : 0);
+ }
+
+ public static Integer readNullableInteger(HornetQBuffer buffer)
+ {
+ boolean isNotNull = buffer.readBoolean();
+
+ if (isNotNull)
+ {
+ return buffer.readInt();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+}
Added: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/AddressSettingsConfigurationStorageTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/AddressSettingsConfigurationStorageTest.java (rev 0)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/AddressSettingsConfigurationStorageTest.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+
+/**
+ * A ConfigurationStorageTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class AddressSettingsConfigurationStorageTest extends StorageManagerTestBase
+{
+
+ private Map<SimpleString, PersistedAddressSetting> mapExpectedAddresses;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ mapExpectedAddresses = new HashMap<SimpleString, PersistedAddressSetting>();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ mapExpectedAddresses = null;
+
+ super.tearDown();
+ }
+
+ protected void addAddress(JournalStorageManager journal, String address, AddressSettings setting) throws Exception
+ {
+ SimpleString str = new SimpleString(address);
+ PersistedAddressSetting persistedSetting = new PersistedAddressSetting(str, setting);
+ mapExpectedAddresses.put(str, persistedSetting);
+ journal.storeAddressSetting(persistedSetting);
+ }
+
+ public void testStoreSecuritySettings() throws Exception
+ {
+ createStorage();
+
+ AddressSettings setting = new AddressSettings();
+
+ setting = new AddressSettings();
+
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+
+ setting.setDeadLetterAddress(new SimpleString("some-test"));
+
+ addAddress(journal, "a2", setting);
+
+ journal.stop();
+
+ createStorage();
+
+ checkAddresses(journal);
+
+ setting = new AddressSettings();
+
+ setting.setDeadLetterAddress(new SimpleString("new-adddress"));
+
+ // Replacing the first setting
+ addAddress(journal, "a1", setting);
+
+ journal.stop();
+
+ createStorage();
+
+ checkAddresses(journal);
+
+ journal.stop();
+
+ journal = null;
+
+ }
+
+ /**
+ * @param journal
+ * @throws Exception
+ */
+ private void checkAddresses(JournalStorageManager journal) throws Exception
+ {
+ List<PersistedAddressSetting> listSetting = journal.recoverAddressSettings();
+
+ assertEquals(mapExpectedAddresses.size(), listSetting.size());
+
+ for (PersistedAddressSetting el : listSetting)
+ {
+ PersistedAddressSetting el2 = mapExpectedAddresses.get(el.getAddressMatch());
+
+ assertEquals(el.getAddressMatch(), el2.getAddressMatch());
+ assertEquals(el.getSetting(), el2.getSetting());
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/JMSConnectionFactoryConfigurationStorageTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/JMSConnectionFactoryConfigurationStorageTest.java (rev 0)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/JMSConnectionFactoryConfigurationStorageTest.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.jms.persistence.PersistedConnectionFactory;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
+
+/**
+ * A JMSConnectionFactoryConfigurationStorageTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JMSConnectionFactoryConfigurationStorageTest extends StorageManagerTestBase
+{
+
+ private Map<String, PersistedConnectionFactory> mapExpectedCFs;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ mapExpectedCFs = new HashMap<String, PersistedConnectionFactory>();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ mapExpectedCFs = null;
+
+ super.tearDown();
+ }
+
+ protected void addSetting(PersistedConnectionFactory setting) throws Exception
+ {
+ mapExpectedCFs.put(setting.getName(), setting);
+ jmsJournal.storeConnectionFactory(setting);
+ }
+
+ public void testSettings() throws Exception
+ {
+
+ createJMSStorage();
+
+ String str[] = new String[5];
+ for (int i = 0; i < 5; i++)
+ {
+ str[i] = "str" + i;
+ }
+
+ ConnectionFactoryConfiguration config = new ConnectionFactoryConfigurationImpl("some-name", str);
+
+ addSetting(new PersistedConnectionFactory(config));
+
+ jmsJournal.stop();
+
+ createJMSStorage();
+
+ List<PersistedConnectionFactory> cfs = jmsJournal.recoverConnectionFactories();
+
+ assertEquals(1, cfs.size());
+
+ assertEquals("some-name", cfs.get(0).getName());
+
+ assertEquals(5, cfs.get(0).getConfig().getBindings().length);
+
+ for (int i = 0; i < 5; i++)
+ {
+ assertEquals("str" + i, cfs.get(0).getConfig().getBindings()[i]);
+ }
+ }
+
+ public void testSizeOfCF() throws Exception
+ {
+
+ String str[] = new String[5];
+ for (int i = 0; i < 5; i++)
+ {
+ str[i] = "str" + i;
+ }
+
+ ConnectionFactoryConfiguration config = new ConnectionFactoryConfigurationImpl("some-name", str);
+
+ int size = config.getEncodeSize();
+
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(size);
+
+ config.encode(buffer);
+
+ assertEquals(size, buffer.writerIndex());
+
+ PersistedConnectionFactory persistedCF = new PersistedConnectionFactory(config);
+
+ size = persistedCF.getEncodeSize();
+
+ buffer = HornetQBuffers.fixedBuffer(size);
+
+ persistedCF.encode(buffer);
+
+ assertEquals(size, buffer.writerIndex());
+
+ }
+
+ /**
+ * @param journal
+ * @throws Exception
+ */
+ private void checkSettings() throws Exception
+ {
+ List<PersistedConnectionFactory> listSetting = jmsJournal.recoverConnectionFactories();
+
+ assertEquals(mapExpectedCFs.size(), listSetting.size());
+
+ for (PersistedConnectionFactory el : listSetting)
+ {
+ PersistedConnectionFactory el2 = mapExpectedCFs.get(el.getName());
+
+ assertEquals(el, el2);
+ }
+ }
+
+}
Added: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/RolesConfigurationStorageTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/RolesConfigurationStorageTest.java (rev 0)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/RolesConfigurationStorageTest.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.persistconfig.PersistedRoles;
+
+/**
+ * A ConfigurationStorageTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class RolesConfigurationStorageTest extends StorageManagerTestBase
+{
+
+ private Map<SimpleString, PersistedRoles> mapExpectedSets;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ mapExpectedSets = new HashMap<SimpleString, PersistedRoles>();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ mapExpectedSets = null;
+
+ super.tearDown();
+ }
+
+ protected void addSetting(PersistedRoles setting) throws Exception
+ {
+ mapExpectedSets.put(setting.getAddressMatch(), setting);
+ journal.storeSecurityRoles(setting);
+ }
+
+ public void testStoreSecuritySettings() throws Exception
+ {
+ createStorage();
+
+
+ addSetting(new PersistedRoles("a#",
+ "a1",
+ "a1",
+ "a1",
+ "a1",
+ "a1",
+ "a1",
+ "a1"));
+
+
+ addSetting(new PersistedRoles("a2",
+ "a1",
+ null,
+ "a1",
+ "a1",
+ "a1",
+ "a1",
+ "a1"));
+
+ journal.stop();
+
+ checkSettings();
+
+ createStorage();
+
+ checkSettings();
+
+ addSetting(new PersistedRoles("a2",
+ "a1",
+ null,
+ "a1",
+ "a1",
+ "a1",
+ "a1",
+ "a1"));
+
+ addSetting(new PersistedRoles("a3",
+ "a1",
+ null,
+ "a1",
+ "a1",
+ "a1",
+ "a1",
+ "a1"));
+
+ checkSettings();
+
+ journal.stop();
+
+ createStorage();
+
+ checkSettings();
+
+ journal.stop();
+
+ journal = null;
+
+ }
+
+ /**
+ * @param journal
+ * @throws Exception
+ */
+ private void checkSettings() throws Exception
+ {
+ List<PersistedRoles> listSetting = journal.recoverPersistedRoles();
+
+ assertEquals(mapExpectedSets.size(), listSetting.size());
+
+ for (PersistedRoles el : listSetting)
+ {
+ PersistedRoles el2 = mapExpectedSets.get(el.getAddressMatch());
+
+ assertEquals(el, el2);
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java (rev 0)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.QueueBindingInfo;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.Queue;
+import org.hornetq.jms.persistence.JMSStorageManager;
+import org.hornetq.jms.persistence.impl.journal.JournalJMSStorageManagerImpl;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.TimeAndCounterIDGenerator;
+
+/**
+ * A StorageManagerTestBase
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class StorageManagerTestBase extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ protected ExecutorService executor;
+
+ protected ExecutorFactory execFactory;
+
+ protected JournalStorageManager journal;
+
+ protected JMSStorageManager jmsJournal;
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ executor = Executors.newCachedThreadPool();
+
+ execFactory = new OrderedExecutorFactory(executor);
+
+ File testdir = new File(getTestDir());
+
+ deleteDirectory(testdir);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ executor.shutdown();
+
+ if (journal != null)
+ {
+ try
+ {
+ journal.stop();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(); // >> junit report
+ }
+
+ journal = null;
+ }
+
+ if (jmsJournal != null)
+ {
+ try
+ {
+ jmsJournal.stop();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(); // >> junit report
+ }
+
+ jmsJournal = null;
+ }
+
+ super.tearDown();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ protected void createStorage() throws Exception
+ {
+ Configuration configuration = createDefaultConfig();
+
+ configuration.setJournalType(JournalType.ASYNCIO);
+
+ journal = new JournalStorageManager(configuration, execFactory);
+
+ journal.start();
+
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
+
+ Map<Long, Queue> queues = new HashMap<Long, Queue>();
+
+ journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null);
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ protected void createJMSStorage() throws Exception
+ {
+ Configuration configuration = createDefaultConfig();
+
+ configuration.setJournalType(JournalType.ASYNCIO);
+
+ jmsJournal = new JournalJMSStorageManagerImpl(new TimeAndCounterIDGenerator(), configuration, null);
+
+ jmsJournal.start();
+ }
+
+
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -48,6 +49,8 @@
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistconfig.PersistedRoles;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
@@ -1307,10 +1310,52 @@
public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
{
- // TODO Auto-generated method stub
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#recoverAddressSettings()
+ */
+ public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
+ {
+ return Collections.emptyList();
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#recoverPersistedRoles()
+ */
+ public List<PersistedRoles> recoverPersistedRoles() throws Exception
+ {
+ return Collections.emptyList();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeAddressSetting(org.hornetq.core.persistconfig.PersistedAddressSetting)
+ */
+ public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeSecurityRoles(org.hornetq.core.persistconfig.PersistedRoles)
+ */
+ public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteAddressSetting(org.hornetq.api.core.SimpleString)
+ */
+ public void deleteAddressSetting(SimpleString addressMatch) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteSecurityRoles(org.hornetq.api.core.SimpleString)
+ */
+ public void deleteSecurityRoles(SimpleString addressMatch) throws Exception
+ {
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
14 years, 2 months
JBoss hornetq SVN: r8960 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-03-25 01:53:03 -0400 (Thu, 25 Mar 2010)
New Revision: 8960
Added:
branches/Clebert_TMP/
Log:
Creating temporary branch
Copied: branches/Clebert_TMP (from rev 8959, trunk)
14 years, 2 months
JBoss hornetq SVN: r8959 - in trunk/src/config: jboss-6/non-clustered and 6 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-03-24 12:08:44 -0400 (Wed, 24 Mar 2010)
New Revision: 8959
Modified:
trunk/src/config/jboss-6/clustered/hornetq-configuration.xml
trunk/src/config/jboss-6/non-clustered/hornetq-configuration.xml
trunk/src/config/jboss-as/clustered/hornetq-configuration.xml
trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
trunk/src/config/trunk/clustered/hornetq-configuration.xml
trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
Log:
fixed and made some changes to our default configs
Modified: trunk/src/config/jboss-6/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-6/clustered/hornetq-configuration.xml 2010-03-24 15:46:47 UTC (rev 8958)
+++ trunk/src/config/jboss-6/clustered/hornetq-configuration.xml 2010-03-24 16:08:44 UTC (rev 8959)
@@ -9,6 +9,8 @@
<bindings-directory>${jboss.server.data.dir}/hornetq/bindings</bindings-directory>
<journal-directory>${jboss.server.data.dir}/hornetq/journal</journal-directory>
+
+ <journal-min-files>10</journal-min-files>
<large-messages-directory>${jboss.server.data.dir}/hornetq/largemessages</large-messages-directory>
@@ -80,9 +82,9 @@
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
- <max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
+ <max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
Modified: trunk/src/config/jboss-6/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-6/non-clustered/hornetq-configuration.xml 2010-03-24 15:46:47 UTC (rev 8958)
+++ trunk/src/config/jboss-6/non-clustered/hornetq-configuration.xml 2010-03-24 16:08:44 UTC (rev 8959)
@@ -7,6 +7,8 @@
<bindings-directory>${jboss.server.data.dir}/hornetq/bindings</bindings-directory>
<journal-directory>${jboss.server.data.dir}/hornetq/journal</journal-directory>
+
+ <journal-min-files>10</journal-min-files>
<large-messages-directory>${jboss.server.data.dir}/hornetq/largemessages</large-messages-directory>
@@ -54,9 +56,9 @@
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
- <max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
+ <max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
Modified: trunk/src/config/jboss-as/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/clustered/hornetq-configuration.xml 2010-03-24 15:46:47 UTC (rev 8958)
+++ trunk/src/config/jboss-as/clustered/hornetq-configuration.xml 2010-03-24 16:08:44 UTC (rev 8959)
@@ -9,7 +9,9 @@
<bindings-directory>${jboss.server.data.dir}/hornetq/bindings</bindings-directory>
<journal-directory>${jboss.server.data.dir}/hornetq/journal</journal-directory>
-
+
+ <journal-min-files>10</journal-min-files>
+
<large-messages-directory>${jboss.server.data.dir}/hornetq/largemessages</large-messages-directory>
<paging-directory>${jboss.server.data.dir}/hornetq/paging</paging-directory>
@@ -80,9 +82,9 @@
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
- <max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
+ <max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
Modified: trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml 2010-03-24 15:46:47 UTC (rev 8958)
+++ trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml 2010-03-24 16:08:44 UTC (rev 8959)
@@ -7,7 +7,9 @@
<bindings-directory>${jboss.server.data.dir}/hornetq/bindings</bindings-directory>
<journal-directory>${jboss.server.data.dir}/hornetq/journal</journal-directory>
-
+
+ <journal-min-files>10</journal-min-files>
+
<large-messages-directory>${jboss.server.data.dir}/hornetq/largemessages</large-messages-directory>
<paging-directory>${jboss.server.data.dir}/hornetq/paging</paging-directory>
@@ -54,9 +56,9 @@
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
- <max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
+ <max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
Modified: trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/clustered/hornetq-configuration.xml 2010-03-24 15:46:47 UTC (rev 8958)
+++ trunk/src/config/stand-alone/clustered/hornetq-configuration.xml 2010-03-24 16:08:44 UTC (rev 8959)
@@ -3,6 +3,16 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<clustered>true</clustered>
+
+ <paging-directory>${data.dir:../data}/paging</paging-directory>
+
+ <bindings-directory>${data.dir:../data}/bindings</bindings-directory>
+
+ <journal-directory>${data.dir:../data}/journal</journal-directory>
+
+ <journal-min-files>10</journal-min-files>
+
+ <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
<connectors>
<connector name="netty">
@@ -59,15 +69,12 @@
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
- <max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
+ <max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
- <paging-directory>${data.dir:../data}/paging</paging-directory>
- <bindings-directory>${data.dir:../data}/bindings</bindings-directory>
- <journal-directory>${data.dir:../data}/journal</journal-directory>
- <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
+
</configuration>
Modified: trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml 2010-03-24 15:46:47 UTC (rev 8958)
+++ trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml 2010-03-24 16:08:44 UTC (rev 8959)
@@ -2,6 +2,16 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+ <paging-directory>${data.dir:../data}/paging</paging-directory>
+
+ <bindings-directory>${data.dir:../data}/bindings</bindings-directory>
+
+ <journal-directory>${data.dir:../data}/journal</journal-directory>
+
+ <journal-min-files>10</journal-min-files>
+
+ <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
+
<connectors>
<connector name="netty">
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
@@ -33,15 +43,10 @@
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
- <max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
+ <max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
- <paging-directory>../data/paging</paging-directory>
- <bindings-directory>../data/bindings</bindings-directory>
- <journal-directory>../data/journal</journal-directory>
- <large-messages-directory>../data/large-messages</large-messages-directory>
-
</configuration>
Modified: trunk/src/config/trunk/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/clustered/hornetq-configuration.xml 2010-03-24 15:46:47 UTC (rev 8958)
+++ trunk/src/config/trunk/clustered/hornetq-configuration.xml 2010-03-24 16:08:44 UTC (rev 8959)
@@ -4,6 +4,8 @@
<clustered>true</clustered>
+ <journal-min-files>10</journal-min-files>
+
<connectors>
<connector name="netty">
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
@@ -53,16 +55,16 @@
</security-setting>
</security-settings>
- <address-settings>
- <!--default for catch all-->
- <address-setting match="#">
- <dead-letter-address>jms.queue.DLQ</dead-letter-address>
- <expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>0</redelivery-delay>
- <max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <message-counter-history-day-limit>10</message-counter-history-day-limit>
- </address-setting>
- </address-settings>
+ <address-settings>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>jms.queue.DLQ</dead-letter-address>
+ <expiry-address>jms.queue.ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <max-size-bytes>10485760</max-size-bytes>
+ <message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>BLOCK</address-full-policy>
+ </address-setting>
+ </address-settings>
</configuration>
Modified: trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/non-clustered/hornetq-configuration.xml 2010-03-24 15:46:47 UTC (rev 8958)
+++ trunk/src/config/trunk/non-clustered/hornetq-configuration.xml 2010-03-24 16:08:44 UTC (rev 8959)
@@ -2,6 +2,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+ <journal-min-files>10</journal-min-files>
+
<connectors>
<connector name="netty">
<factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
@@ -27,16 +29,16 @@
</security-setting>
</security-settings>
- <address-settings>
- <!--default for catch all-->
- <address-setting match="#">
- <dead-letter-address>jms.queue.DLQ</dead-letter-address>
- <expiry-address>jms.queue.ExpiryQueue</expiry-address>
- <redelivery-delay>0</redelivery-delay>
- <max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <message-counter-history-day-limit>10</message-counter-history-day-limit>
- </address-setting>
- </address-settings>
+ <address-settings>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>jms.queue.DLQ</dead-letter-address>
+ <expiry-address>jms.queue.ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <max-size-bytes>10485760</max-size-bytes>
+ <message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>BLOCK</address-full-policy>
+ </address-setting>
+ </address-settings>
</configuration>
14 years, 2 months