[hornetq-commits] JBoss hornetq SVN: r8961 - in branches/Clebert_TMP: src/main/org/hornetq/core and 19 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Mar 25 01:56:24 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-03-25 01:56:22 -0400 (Thu, 25 Mar 2010)
New Revision: 8961
Added:
branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/
branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedAddressSetting.java
branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedRoles.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedConnectionFactory.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/
branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
branches/Clebert_TMP/src/main/org/hornetq/utils/BufferHelper.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/AddressSettingsConfigurationStorageTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/JMSConnectionFactoryConfigurationStorageTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/RolesConfigurationStorageTest.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
Modified:
branches/Clebert_TMP/src/main/org/hornetq/api/core/management/HornetQServerControl.java
branches/Clebert_TMP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/Clebert_TMP/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Clebert_TMP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Clebert_TMP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Clebert_TMP/src/main/org/hornetq/core/settings/impl/AddressSettings.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Clebert_TMP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Uploading local copy to a branch
Modified: branches/Clebert_TMP/src/main/org/hornetq/api/core/management/HornetQServerControl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -524,7 +524,7 @@
AddressSettings getAddressSettings(String address);
- void removeAddressSettings(String addressMatch);
+ void removeAddressSettings(String addressMatch) throws Exception;
/**
* returns the address settings as a JSON string
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -37,6 +37,8 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.messagecounter.MessageCounterManager;
import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistconfig.PersistedRoles;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.server.RemotingService;
@@ -83,9 +85,10 @@
private final NotificationBroadcasterSupport broadcaster;
+ // private final Map
+
// Static --------------------------------------------------------
-
// Constructors --------------------------------------------------
public HornetQServerControlImpl(final PostOffice postOffice,
@@ -483,6 +486,7 @@
blockOnIO();
}
}
+
public void deployQueue(final String address, final String name, final String filterString) throws Exception
{
clearIO();
@@ -1052,7 +1056,7 @@
blockOnIO();
}
}
-
+
public void addSecuritySettings(String addressMatch,
String sendRoles,
String consumeRoles,
@@ -1060,14 +1064,31 @@
String deleteDurableQueueRoles,
String createTempQueueRoles,
String deleteTempQueueRoles,
- String manageRoles)
+ String manageRoles) throws Exception
{
clearIO();
try
{
- Set<Role> roles = SecurityFormatter.createSecurity(sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createTempQueueRoles, deleteTempQueueRoles, manageRoles);
+ Set<Role> roles = SecurityFormatter.createSecurity(sendRoles,
+ consumeRoles,
+ createDurableQueueRoles,
+ deleteDurableQueueRoles,
+ createTempQueueRoles,
+ deleteTempQueueRoles,
+ manageRoles);
- server.getSecurityRepository().addMatch(addressMatch, roles );
+ server.getSecurityRepository().addMatch(addressMatch, roles);
+
+ PersistedRoles persistedRoles = new PersistedRoles(addressMatch,
+ sendRoles,
+ consumeRoles,
+ createDurableQueueRoles,
+ deleteDurableQueueRoles,
+ createTempQueueRoles,
+ deleteTempQueueRoles,
+ manageRoles);
+
+ storageManager.storeSecurityRoles(persistedRoles);
}
finally
{
@@ -1075,13 +1096,13 @@
}
}
-
- public void removeSecuritySettings(String addressMatch)
+ public void removeSecuritySettings(String addressMatch) throws Exception
{
clearIO();
try
{
server.getSecurityRepository().removeMatch(addressMatch);
+ storageManager.deleteSecurityRoles(new SimpleString(addressMatch));
}
finally
{
@@ -1098,9 +1119,10 @@
}
finally
{
- blockOnIO();
+ blockOnIO();
}
}
+
public Object[] getRoles(String addressMatch) throws Exception
{
clearIO();
@@ -1154,11 +1176,11 @@
{
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(address);
Map<String, Object> settings = new HashMap<String, Object>();
- if(addressSettings.getDeadLetterAddress() != null)
+ if (addressSettings.getDeadLetterAddress() != null)
{
settings.put("DLA", addressSettings.getDeadLetterAddress());
}
- if(addressSettings.getExpiryAddress() != null)
+ if (addressSettings.getExpiryAddress() != null)
{
settings.put("expiryAddress", addressSettings.getExpiryAddress());
}
@@ -1169,14 +1191,16 @@
settings.put("redistributionDelay", addressSettings.getRedistributionDelay());
settings.put("lastValueQueue", addressSettings.isLastValueQueue());
settings.put("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute());
- String policy = addressSettings.getAddressFullMessagePolicy() == AddressFullMessagePolicy.PAGE?"PAGE":addressSettings.getAddressFullMessagePolicy() == AddressFullMessagePolicy.BLOCK?"BLOCK":"DROP";
+ String policy = addressSettings.getAddressFullMessagePolicy() == AddressFullMessagePolicy.PAGE ? "PAGE"
+ : addressSettings.getAddressFullMessagePolicy() == AddressFullMessagePolicy.BLOCK ? "BLOCK"
+ : "DROP";
settings.put("addressFullMessagePolicy", policy);
JSONObject jsonObject = new JSONObject(settings);
return jsonObject.toString();
}
- public void addAddressSettings(final String address,
+ public void addAddressSettings(final String address,
final String DLA,
final String expiryAddress,
final boolean lastValueQueue,
@@ -1189,8 +1213,8 @@
final String addressFullMessagePolicy) throws Exception
{
AddressSettings addressSettings = new AddressSettings();
- addressSettings.setDeadLetterAddress(DLA == null?null:new SimpleString(DLA));
- addressSettings.setExpiryAddress(expiryAddress == null?null:new SimpleString(expiryAddress));
+ addressSettings.setDeadLetterAddress(DLA == null ? null : new SimpleString(DLA));
+ addressSettings.setExpiryAddress(expiryAddress == null ? null : new SimpleString(expiryAddress));
addressSettings.setLastValueQueue(lastValueQueue);
addressSettings.setMaxDeliveryAttempts(deliveryAttempts);
addressSettings.setMaxSizeBytes(maxSizeBytes);
@@ -1198,23 +1222,25 @@
addressSettings.setRedeliveryDelay(redeliveryDelay);
addressSettings.setRedistributionDelay(redistributionDelay);
addressSettings.setSendToDLAOnNoRoute(sendToDLAOnNoRoute);
- if(addressFullMessagePolicy == null)
+ if (addressFullMessagePolicy == null)
{
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
}
- else if(addressFullMessagePolicy.equalsIgnoreCase("PAGE"))
+ else if (addressFullMessagePolicy.equalsIgnoreCase("PAGE"))
{
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
}
- else if(addressFullMessagePolicy.equalsIgnoreCase("DROP"))
+ else if (addressFullMessagePolicy.equalsIgnoreCase("DROP"))
{
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
}
- else if(addressFullMessagePolicy.equalsIgnoreCase("BLOCK"))
+ else if (addressFullMessagePolicy.equalsIgnoreCase("BLOCK"))
{
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
}
server.getAddressSettingsRepository().addMatch(address, addressSettings);
+
+ storageManager.storeAddressSetting(new PersistedAddressSetting(new SimpleString(address), addressSettings));
}
public AddressSettings getAddressSettings(final String address)
@@ -1222,9 +1248,10 @@
return server.getAddressSettingsRepository().getMatch(address);
}
- public void removeAddressSettings(String addressMatch)
+ public void removeAddressSettings(String addressMatch) throws Exception
{
server.getAddressSettingsRepository().removeMatch(addressMatch);
+ storageManager.deleteAddressSetting(new SimpleString(addressMatch));
}
public void sendQueueInfoToQueue(final String queueName, final String address) throws Exception
Added: branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedAddressSetting.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedAddressSetting.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedAddressSetting.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistconfig;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.settings.impl.AddressSettings;
+
+/**
+ * A PersistedAddressSetting
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public class PersistedAddressSetting implements EncodingSupport
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long storeId;
+
+ private SimpleString addressMatch;
+
+ private AddressSettings setting;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PersistedAddressSetting()
+ {
+ super();
+ }
+
+ /**
+ * @param addressMatch
+ * @param setting
+ */
+ public PersistedAddressSetting(SimpleString addressMatch, AddressSettings setting)
+ {
+ super();
+ this.addressMatch = addressMatch;
+ this.setting = setting;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setStoreId(long id)
+ {
+ this.storeId = id;
+ }
+
+ public long getStoreId()
+ {
+ return storeId;
+ }
+
+ /**
+ * @return the addressMatch
+ */
+ public SimpleString getAddressMatch()
+ {
+ return addressMatch;
+ }
+
+ /**
+ * @return the setting
+ */
+ public AddressSettings getSetting()
+ {
+ return setting;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ addressMatch = buffer.readSimpleString();
+
+ setting = new AddressSettings();
+ setting.decode(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeSimpleString(addressMatch);
+
+ setting.encode(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return addressMatch.sizeof() + setting.getEncodeSize();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedRoles.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedRoles.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/persistconfig/PersistedRoles.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,316 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistconfig;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.EncodingSupport;
+
+/**
+ * A ConfiguredRoles
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PersistedRoles implements EncodingSupport
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long storeId;
+
+ private SimpleString addressMatch;
+
+ private SimpleString sendRoles;
+
+ private SimpleString consumeRoles;
+
+ private SimpleString createDurableQueueRoles;
+
+ private SimpleString deleteDurableQueueRoles;
+
+ private SimpleString createTempQueueRoles;
+
+ private SimpleString deleteTempQueueRoles;
+
+ private SimpleString manageRoles;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public PersistedRoles()
+ {
+ }
+
+ /**
+ * @param address
+ * @param addressMatch
+ * @param sendRoles
+ * @param consumeRoles
+ * @param createDurableQueueRoles
+ * @param deleteDurableQueueRoles
+ * @param createTempQueueRoles
+ * @param deleteTempQueueRoles
+ * @param manageRoles
+ */
+ public PersistedRoles(final String addressMatch,
+ final String sendRoles,
+ final String consumeRoles,
+ final String createDurableQueueRoles,
+ final String deleteDurableQueueRoles,
+ final String createTempQueueRoles,
+ final String deleteTempQueueRoles,
+ final String manageRoles)
+ {
+ super();
+ this.addressMatch = SimpleString.toSimpleString(addressMatch);
+ this.sendRoles = SimpleString.toSimpleString(sendRoles);
+ this.consumeRoles = SimpleString.toSimpleString(consumeRoles);
+ this.createDurableQueueRoles = SimpleString.toSimpleString(createDurableQueueRoles);
+ this.deleteDurableQueueRoles = SimpleString.toSimpleString(deleteDurableQueueRoles);
+ this.createTempQueueRoles = SimpleString.toSimpleString(createTempQueueRoles);
+ this.deleteTempQueueRoles = SimpleString.toSimpleString(deleteTempQueueRoles);
+ this.manageRoles = SimpleString.toSimpleString(manageRoles);
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getStoreId()
+ {
+ return storeId;
+ }
+
+ public void setStoreId(final long id)
+ {
+ storeId = id;
+ }
+
+ /**
+ * @return the addressMatch
+ */
+ public SimpleString getAddressMatch()
+ {
+ return addressMatch;
+ }
+
+ /**
+ * @return the sendRoles
+ */
+ public String getSendRoles()
+ {
+ return sendRoles.toString();
+ }
+
+ /**
+ * @return the consumeRoles
+ */
+ public String getConsumeRoles()
+ {
+ return consumeRoles.toString();
+ }
+
+ /**
+ * @return the createDurableQueueRoles
+ */
+ public String getCreateDurableQueueRoles()
+ {
+ return createDurableQueueRoles.toString();
+ }
+
+ /**
+ * @return the deleteDurableQueueRoles
+ */
+ public String getDeleteDurableQueueRoles()
+ {
+ return deleteDurableQueueRoles.toString();
+ }
+
+ /**
+ * @return the createTempQueueRoles
+ */
+ public String getCreateTempQueueRoles()
+ {
+ return createTempQueueRoles.toString();
+ }
+
+ /**
+ * @return the deleteTempQueueRoles
+ */
+ public String getDeleteTempQueueRoles()
+ {
+ return deleteTempQueueRoles.toString();
+ }
+
+ /**
+ * @return the manageRoles
+ */
+ public String getManageRoles()
+ {
+ return manageRoles.toString();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeSimpleString(addressMatch);
+ buffer.writeNullableSimpleString(sendRoles);
+ buffer.writeNullableSimpleString(consumeRoles);
+ buffer.writeNullableSimpleString(createDurableQueueRoles);
+ buffer.writeNullableSimpleString(deleteDurableQueueRoles);
+ buffer.writeNullableSimpleString(createTempQueueRoles);
+ buffer.writeNullableSimpleString(deleteTempQueueRoles);
+ buffer.writeNullableSimpleString(manageRoles);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return addressMatch.sizeof() + SimpleString.sizeofNullableString(sendRoles) +
+ SimpleString.sizeofNullableString(consumeRoles) +
+ SimpleString.sizeofNullableString(createDurableQueueRoles) +
+ SimpleString.sizeofNullableString(deleteDurableQueueRoles) +
+ SimpleString.sizeofNullableString(createTempQueueRoles) +
+ SimpleString.sizeofNullableString(deleteTempQueueRoles) +
+ SimpleString.sizeofNullableString(manageRoles);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(final HornetQBuffer buffer)
+ {
+ addressMatch = buffer.readSimpleString();
+ sendRoles = buffer.readNullableSimpleString();
+ consumeRoles = buffer.readNullableSimpleString();
+ createDurableQueueRoles = buffer.readNullableSimpleString();
+ deleteDurableQueueRoles = buffer.readNullableSimpleString();
+ createTempQueueRoles = buffer.readNullableSimpleString();
+ deleteTempQueueRoles = buffer.readNullableSimpleString();
+ manageRoles = buffer.readNullableSimpleString();
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((addressMatch == null) ? 0 : addressMatch.hashCode());
+ result = prime * result + ((consumeRoles == null) ? 0 : consumeRoles.hashCode());
+ result = prime * result + ((createDurableQueueRoles == null) ? 0 : createDurableQueueRoles.hashCode());
+ result = prime * result + ((createTempQueueRoles == null) ? 0 : createTempQueueRoles.hashCode());
+ result = prime * result + ((deleteDurableQueueRoles == null) ? 0 : deleteDurableQueueRoles.hashCode());
+ result = prime * result + ((deleteTempQueueRoles == null) ? 0 : deleteTempQueueRoles.hashCode());
+ result = prime * result + ((manageRoles == null) ? 0 : manageRoles.hashCode());
+ result = prime * result + ((sendRoles == null) ? 0 : sendRoles.hashCode());
+ result = prime * result + (int)(storeId ^ (storeId >>> 32));
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PersistedRoles other = (PersistedRoles)obj;
+ if (addressMatch == null)
+ {
+ if (other.addressMatch != null)
+ return false;
+ }
+ else if (!addressMatch.equals(other.addressMatch))
+ return false;
+ if (consumeRoles == null)
+ {
+ if (other.consumeRoles != null)
+ return false;
+ }
+ else if (!consumeRoles.equals(other.consumeRoles))
+ return false;
+ if (createDurableQueueRoles == null)
+ {
+ if (other.createDurableQueueRoles != null)
+ return false;
+ }
+ else if (!createDurableQueueRoles.equals(other.createDurableQueueRoles))
+ return false;
+ if (createTempQueueRoles == null)
+ {
+ if (other.createTempQueueRoles != null)
+ return false;
+ }
+ else if (!createTempQueueRoles.equals(other.createTempQueueRoles))
+ return false;
+ if (deleteDurableQueueRoles == null)
+ {
+ if (other.deleteDurableQueueRoles != null)
+ return false;
+ }
+ else if (!deleteDurableQueueRoles.equals(other.deleteDurableQueueRoles))
+ return false;
+ if (deleteTempQueueRoles == null)
+ {
+ if (other.deleteTempQueueRoles != null)
+ return false;
+ }
+ else if (!deleteTempQueueRoles.equals(other.deleteTempQueueRoles))
+ return false;
+ if (manageRoles == null)
+ {
+ if (other.manageRoles != null)
+ return false;
+ }
+ else if (!manageRoles.equals(other.manageRoles))
+ return false;
+ if (sendRoles == null)
+ {
+ if (other.sendRoles != null)
+ return false;
+ }
+ else if (!sendRoles.equals(other.sendRoles))
+ return false;
+ if (storeId != other.storeId)
+ return false;
+ return true;
+ }
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/persistence/StorageManager.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/persistence/StorageManager.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -13,6 +13,7 @@
package org.hornetq.core.persistence;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -26,6 +27,8 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistconfig.PersistedRoles;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQComponent;
@@ -160,4 +163,16 @@
void addGrouping(GroupBinding groupBinding) throws Exception;
void deleteGrouping(GroupBinding groupBinding) throws Exception;
+
+ void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception;
+
+ void deleteAddressSetting(SimpleString addressMatch) throws Exception;
+
+ List<PersistedAddressSetting> recoverAddressSettings() throws Exception;
+
+ void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception;
+
+ void deleteSecurityRoles(SimpleString addressMatch) throws Exception;
+
+ List<PersistedRoles> recoverPersistedRoles() throws Exception;
}
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -20,6 +20,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.transaction.xa.Xid;
@@ -49,6 +50,8 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistconfig.PersistedRoles;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
@@ -101,6 +104,10 @@
public static final byte ID_COUNTER_RECORD = 24;
+ public static final byte ADDRESS_SETTING_RECORD = 25;
+
+ public static final byte SECURITY_RECORD = 26;
+
// type + expiration + timestamp + priority
public static final int SIZE_FIELDS = DataConstants.SIZE_INT + DataConstants.SIZE_LONG +
DataConstants.SIZE_LONG +
@@ -160,6 +167,12 @@
private final String journalDir;
private final String largeMessagesDirectory;
+
+
+ // Persisted core configuration
+ private final Map<SimpleString, PersistedRoles> mapPersistedRoles = new ConcurrentHashMap<SimpleString, PersistedRoles>();
+
+ private final Map<SimpleString, PersistedAddressSetting> mapPersistedAddressSettings = new ConcurrentHashMap<SimpleString, PersistedAddressSetting>();
public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory)
{
@@ -690,7 +703,69 @@
getContext(syncNonTransactional));
}
+
+
+ public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
+ {
+ deleteAddressSetting(addressSetting.getAddressMatch());
+ long id = idGenerator.generateID();
+ addressSetting.setStoreId(id);
+ bindingsJournal.appendAddRecord(id, ADDRESS_SETTING_RECORD, addressSetting, true);
+ mapPersistedAddressSettings.put(addressSetting.getAddressMatch(), addressSetting);
+ }
+
+ public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
+ {
+ ArrayList<PersistedAddressSetting> list = new ArrayList<PersistedAddressSetting>(mapPersistedAddressSettings.size());
+ list.addAll(mapPersistedAddressSettings.values());
+ return list;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#recoverPersistedRoles()
+ */
+ public List<PersistedRoles> recoverPersistedRoles() throws Exception
+ {
+ ArrayList<PersistedRoles> list = new ArrayList<PersistedRoles>(mapPersistedRoles.size());
+ list.addAll(mapPersistedRoles.values());
+ return list;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeSecurityRoles(org.hornetq.core.persistconfig.PersistedRoles)
+ */
+ public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception
+ {
+
+ deleteSecurityRoles(persistedRoles.getAddressMatch());
+ long id = idGenerator.generateID();
+ persistedRoles.setStoreId(id);
+ bindingsJournal.appendAddRecord(id, SECURITY_RECORD, persistedRoles, true);
+ mapPersistedRoles.put(persistedRoles.getAddressMatch(), persistedRoles);
+ }
+
+ public void deleteAddressSetting(SimpleString addressMatch) throws Exception
+ {
+ PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch);
+ if (oldSetting != null)
+ {
+ bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false);
+ }
+
+ }
+
+ public void deleteSecurityRoles(SimpleString addressMatch) throws Exception
+ {
+ PersistedRoles oldRoles = mapPersistedRoles.remove(addressMatch);
+ if (oldRoles != null)
+ {
+ bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false);
+ }
+ }
+
+
+
public JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
final PagingManager pagingManager,
final ResourceManager resourceManager,
@@ -1028,6 +1103,20 @@
encoding.setId(id);
groupingInfos.add(encoding);
}
+ else if (rec == JournalStorageManager.ADDRESS_SETTING_RECORD)
+ {
+ PersistedAddressSetting setting = new PersistedAddressSetting();
+ setting.decode(buffer);
+ setting.setStoreId(id);
+ mapPersistedAddressSettings.put(setting.getAddressMatch(), setting);
+ }
+ else if (rec == JournalStorageManager.SECURITY_RECORD)
+ {
+ PersistedRoles roles = new PersistedRoles();
+ roles.decode(buffer);
+ roles.setStoreId(id);
+ mapPersistedRoles.put(roles.getAddressMatch(), roles);
+ }
else
{
throw new IllegalStateException("Invalid record type " + rec);
@@ -2051,5 +2140,4 @@
}
-
}
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -13,6 +13,8 @@
package org.hornetq.core.persistence.impl.nullpm;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -29,6 +31,8 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistconfig.PersistedRoles;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
@@ -388,4 +392,48 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#recoverAddressSettings()
+ */
+ public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
+ {
+ return Collections.emptyList();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeAddressSetting(org.hornetq.core.persistconfig.PersistedAddressSetting)
+ */
+ public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#recoverPersistedRoles()
+ */
+ public List<PersistedRoles> recoverPersistedRoles() throws Exception
+ {
+ return Collections.emptyList();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeSecurityRoles(org.hornetq.core.persistconfig.PersistedRoles)
+ */
+ public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteAddressSetting(org.hornetq.api.core.SimpleString)
+ */
+ public void deleteAddressSetting(SimpleString addressMatch) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteSecurityRoles(org.hornetq.api.core.SimpleString)
+ */
+ public void deleteSecurityRoles(SimpleString addressMatch) throws Exception
+ {
+ }
+
}
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -76,11 +76,9 @@
private final HornetQServer server;
private Channel channel;
+
+ private Journal[] journals;
- private Journal bindingsJournal;
-
- private Journal messagingJournal;
-
private JournalStorageManager storage;
private PagingManager pageManager;
@@ -101,6 +99,26 @@
}
// Public --------------------------------------------------------
+
+ public void registerJournal(final byte id, final Journal journal)
+ {
+ if (journals == null || id >= journals.length)
+ {
+ Journal[] oldJournals = journals;
+ journals = new Journal[id + 1];
+
+ if (oldJournals != null)
+ {
+ for (int i = 0 ; i < oldJournals.length; i++)
+ {
+ journals[i] = oldJournals[i];
+ }
+ }
+ }
+
+ journals[id] = journal;
+ }
+
/*
* (non-Javadoc)
* @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
@@ -194,8 +212,8 @@
server.getManagementService().setStorageManager(storage);
- bindingsJournal = storage.getBindingsJournal();
- messagingJournal = storage.getMessageJournal();
+ registerJournal((byte)1, storage.getMessageJournal());
+ registerJournal((byte)0, storage.getBindingsJournal());
// We only need to load internal structures on the backup...
journalLoadInformation = storage.loadInternalOnly();
@@ -591,16 +609,7 @@
*/
private Journal getJournal(final byte journalID)
{
- Journal journalToUse;
- if (journalID == (byte)0)
- {
- journalToUse = bindingsJournal;
- }
- else
- {
- journalToUse = messagingJournal;
- }
- return journalToUse;
+ return this.journals[journalID];
}
// Inner classes -------------------------------------------------
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -59,6 +59,8 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistconfig.PersistedRoles;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
@@ -111,6 +113,7 @@
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.SecurityFormatter;
import org.hornetq.utils.UUID;
import org.hornetq.utils.UUIDGenerator;
import org.hornetq.utils.VersionLoader;
@@ -258,7 +261,7 @@
addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
addressSettingsRepository.setDefault(new AddressSettings());
-
+
securityRepository = new HierarchicalObjectRepository<Set<Role>>();
securityRepository.setDefault(new HashSet<Role>());
@@ -434,9 +437,9 @@
{
memoryManager.stop();
}
-
+
addressSettingsRepository.clear();
-
+
securityRepository.clear();
pagingManager = null;
@@ -554,7 +557,7 @@
final boolean xa,
final SessionCallback callback) throws Exception
{
-
+
if (securityStore != null)
{
securityStore.authenticate(username, password);
@@ -692,7 +695,11 @@
if (queue.getConsumerCount() != 0)
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot delete queue " + queue.getName() + " on binding " + queueName + " - it has consumers = " + binding.getClass().getName());
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot delete queue " + queue.getName() +
+ " on binding " +
+ queueName +
+ " - it has consumers = " +
+ binding.getClass().getName());
}
if (session != null)
@@ -975,7 +982,7 @@
configuration.isBackup());
// Address settings need to deployed initially, since they're require on paging manager.start()
-
+
deployAddressSettingsFromConfiguration();
if (configuration.isFileDeploymentEnabled())
@@ -1119,7 +1126,6 @@
}
}
-
private JournalLoadInformation[] loadJournals() throws Exception
{
JournalLoadInformation[] journalInfo = new JournalLoadInformation[2];
@@ -1130,6 +1136,8 @@
journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
+ recoverStoredConfigs();
+
// Set the node id - must be before we load the queues into the postoffice, but after we load the journal
setNodeID();
@@ -1189,6 +1197,33 @@
return journalInfo;
}
+ /**
+ * @throws Exception
+ */
+ private void recoverStoredConfigs() throws Exception
+ {
+ List<PersistedAddressSetting> adsettings = storageManager.recoverAddressSettings();
+ for (PersistedAddressSetting set : adsettings)
+ {
+ addressSettingsRepository.addMatch(set.getAddressMatch().toString(), set.getSetting());
+ }
+
+ List<PersistedRoles> roles = storageManager.recoverPersistedRoles();
+
+ for (PersistedRoles roleItem : roles)
+ {
+ Set<Role> setRoles = SecurityFormatter.createSecurity(roleItem.getSendRoles(),
+ roleItem.getConsumeRoles(),
+ roleItem.getCreateDurableQueueRoles(),
+ roleItem.getDeleteDurableQueueRoles(),
+ roleItem.getCreateTempQueueRoles(),
+ roleItem.getDeleteTempQueueRoles(),
+ roleItem.getManageRoles());
+
+ securityRepository.addMatch(roleItem.getAddressMatch().toString(), setRoles);
+ }
+ }
+
private void setNodeID() throws Exception
{
if (!configuration.isBackup())
Modified: branches/Clebert_TMP/src/main/org/hornetq/core/settings/impl/AddressSettings.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -15,9 +15,13 @@
import java.io.Serializable;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.settings.Mergeable;
+import org.hornetq.utils.BufferHelper;
+import org.hornetq.utils.DataConstants;
/**
* Configuration settings that are applied on the address level
@@ -25,7 +29,7 @@
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
*/
-public class AddressSettings implements Mergeable<AddressSettings>, Serializable
+public class AddressSettings implements Mergeable<AddressSettings>, Serializable, EncodingSupport
{
private static final long serialVersionUID = 1607502280582336366L;
@@ -240,4 +244,250 @@
}
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ SimpleString policyStr = buffer.readNullableSimpleString();
+
+ if (policyStr != null)
+ {
+ addressFullMessagePolicy = AddressFullMessagePolicy.valueOf(policyStr.toString());
+ }
+ else
+ {
+ addressFullMessagePolicy = null;
+ }
+
+ maxSizeBytes = BufferHelper.readNullableLong(buffer);
+
+ pageSizeBytes = BufferHelper.readNullableInteger(buffer);
+
+ dropMessagesWhenFull = BufferHelper.readNullableBoolean(buffer);
+
+ maxDeliveryAttempts = BufferHelper.readNullableInteger(buffer);
+
+ messageCounterHistoryDayLimit = BufferHelper.readNullableInteger(buffer);
+
+ redeliveryDelay = BufferHelper.readNullableLong(buffer);
+
+ deadLetterAddress = buffer.readNullableSimpleString();
+
+ expiryAddress = buffer.readNullableSimpleString();
+
+ lastValueQueue = BufferHelper.readNullableBoolean(buffer);
+
+ redeliveryDelay = BufferHelper.readNullableLong(buffer);
+
+ sendToDLAOnNoRoute = BufferHelper.readNullableBoolean(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+
+ return BufferHelper.sizeOfNullableSimpleString(addressFullMessagePolicy != null ? addressFullMessagePolicy.toString()
+ : null) + BufferHelper.sizeOfNullableLong(maxSizeBytes) +
+ BufferHelper.sizeOfNullableInteger(pageSizeBytes) +
+ BufferHelper.sizeOfNullableBoolean(dropMessagesWhenFull) +
+ BufferHelper.sizeOfNullableInteger(maxDeliveryAttempts) +
+ BufferHelper.sizeOfNullableInteger(messageCounterHistoryDayLimit) +
+ BufferHelper.sizeOfNullableLong(redeliveryDelay) +
+ SimpleString.sizeofNullableString(deadLetterAddress) +
+ SimpleString.sizeofNullableString(expiryAddress) +
+ BufferHelper.sizeOfNullableBoolean(lastValueQueue) +
+ BufferHelper.sizeOfNullableLong(redistributionDelay) +
+ BufferHelper.sizeOfNullableBoolean(sendToDLAOnNoRoute);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeNullableSimpleString(addressFullMessagePolicy != null ? new SimpleString(addressFullMessagePolicy.toString())
+ : null);
+
+ BufferHelper.writeNullableLong(buffer, maxSizeBytes);
+
+ BufferHelper.writeNullableInteger(buffer, pageSizeBytes);
+
+ BufferHelper.writeNullableBoolean(buffer, dropMessagesWhenFull);
+
+ BufferHelper.writeNullableInteger(buffer, maxDeliveryAttempts);
+
+ BufferHelper.writeNullableInteger(buffer, messageCounterHistoryDayLimit);
+
+ BufferHelper.writeNullableLong(buffer, redeliveryDelay);
+
+ buffer.writeNullableSimpleString(deadLetterAddress);
+
+ buffer.writeNullableSimpleString(expiryAddress);
+
+ BufferHelper.writeNullableBoolean(buffer, lastValueQueue);
+
+ BufferHelper.writeNullableLong(buffer, redistributionDelay);
+
+ BufferHelper.writeNullableBoolean(buffer, sendToDLAOnNoRoute);
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((addressFullMessagePolicy == null) ? 0 : addressFullMessagePolicy.hashCode());
+ result = prime * result + ((deadLetterAddress == null) ? 0 : deadLetterAddress.hashCode());
+ result = prime * result + ((dropMessagesWhenFull == null) ? 0 : dropMessagesWhenFull.hashCode());
+ result = prime * result + ((expiryAddress == null) ? 0 : expiryAddress.hashCode());
+ result = prime * result + ((lastValueQueue == null) ? 0 : lastValueQueue.hashCode());
+ result = prime * result + ((maxDeliveryAttempts == null) ? 0 : maxDeliveryAttempts.hashCode());
+ result = prime * result + ((maxSizeBytes == null) ? 0 : maxSizeBytes.hashCode());
+ result = prime * result +
+ ((messageCounterHistoryDayLimit == null) ? 0 : messageCounterHistoryDayLimit.hashCode());
+ result = prime * result + ((pageSizeBytes == null) ? 0 : pageSizeBytes.hashCode());
+ result = prime * result + ((redeliveryDelay == null) ? 0 : redeliveryDelay.hashCode());
+ result = prime * result + ((redistributionDelay == null) ? 0 : redistributionDelay.hashCode());
+ result = prime * result + ((sendToDLAOnNoRoute == null) ? 0 : sendToDLAOnNoRoute.hashCode());
+ return result;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ AddressSettings other = (AddressSettings)obj;
+ if (addressFullMessagePolicy == null)
+ {
+ if (other.addressFullMessagePolicy != null)
+ return false;
+ }
+ else if (!addressFullMessagePolicy.equals(other.addressFullMessagePolicy))
+ return false;
+ if (deadLetterAddress == null)
+ {
+ if (other.deadLetterAddress != null)
+ return false;
+ }
+ else if (!deadLetterAddress.equals(other.deadLetterAddress))
+ return false;
+ if (dropMessagesWhenFull == null)
+ {
+ if (other.dropMessagesWhenFull != null)
+ return false;
+ }
+ else if (!dropMessagesWhenFull.equals(other.dropMessagesWhenFull))
+ return false;
+ if (expiryAddress == null)
+ {
+ if (other.expiryAddress != null)
+ return false;
+ }
+ else if (!expiryAddress.equals(other.expiryAddress))
+ return false;
+ if (lastValueQueue == null)
+ {
+ if (other.lastValueQueue != null)
+ return false;
+ }
+ else if (!lastValueQueue.equals(other.lastValueQueue))
+ return false;
+ if (maxDeliveryAttempts == null)
+ {
+ if (other.maxDeliveryAttempts != null)
+ return false;
+ }
+ else if (!maxDeliveryAttempts.equals(other.maxDeliveryAttempts))
+ return false;
+ if (maxSizeBytes == null)
+ {
+ if (other.maxSizeBytes != null)
+ return false;
+ }
+ else if (!maxSizeBytes.equals(other.maxSizeBytes))
+ return false;
+ if (messageCounterHistoryDayLimit == null)
+ {
+ if (other.messageCounterHistoryDayLimit != null)
+ return false;
+ }
+ else if (!messageCounterHistoryDayLimit.equals(other.messageCounterHistoryDayLimit))
+ return false;
+ if (pageSizeBytes == null)
+ {
+ if (other.pageSizeBytes != null)
+ return false;
+ }
+ else if (!pageSizeBytes.equals(other.pageSizeBytes))
+ return false;
+ if (redeliveryDelay == null)
+ {
+ if (other.redeliveryDelay != null)
+ return false;
+ }
+ else if (!redeliveryDelay.equals(other.redeliveryDelay))
+ return false;
+ if (redistributionDelay == null)
+ {
+ if (other.redistributionDelay != null)
+ return false;
+ }
+ else if (!redistributionDelay.equals(other.redistributionDelay))
+ return false;
+ if (sendToDLAOnNoRoute == null)
+ {
+ if (other.sendToDLAOnNoRoute != null)
+ return false;
+ }
+ else if (!sendToDLAOnNoRoute.equals(other.sendToDLAOnNoRoute))
+ return false;
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "AddressSettings [addressFullMessagePolicy=" + addressFullMessagePolicy +
+ ", deadLetterAddress=" +
+ deadLetterAddress +
+ ", dropMessagesWhenFull=" +
+ dropMessagesWhenFull +
+ ", expiryAddress=" +
+ expiryAddress +
+ ", lastValueQueue=" +
+ lastValueQueue +
+ ", maxDeliveryAttempts=" +
+ maxDeliveryAttempts +
+ ", maxSizeBytes=" +
+ maxSizeBytes +
+ ", messageCounterHistoryDayLimit=" +
+ messageCounterHistoryDayLimit +
+ ", pageSizeBytes=" +
+ pageSizeBytes +
+ ", redeliveryDelay=" +
+ redeliveryDelay +
+ ", redistributionDelay=" +
+ redistributionDelay +
+ ", sendToDLAOnNoRoute=" +
+ sendToDLAOnNoRoute +
+ "]";
+ }
+
}
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/JMSStorageManager.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.persistence;
+
+import java.util.List;
+
+import org.hornetq.core.server.HornetQComponent;
+
+/**
+ * A JMSPersistence
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface JMSStorageManager extends HornetQComponent
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ void storeDestination(PersistedDestination destination);
+
+ List<PersistedDestination> recoverDestinations();
+
+ void deleteConnectionFactory(String connectionFactory) throws Exception;
+
+ void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws Exception;
+
+ List<PersistedConnectionFactory> recoverConnectionFactories();
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedConnectionFactory.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedConnectionFactory.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedConnectionFactory.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.persistence;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
+
+/**
+ * A PersistedConnectionFactory
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PersistedConnectionFactory implements EncodingSupport
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long id;
+
+ private ConnectionFactoryConfiguration config;
+
+ public PersistedConnectionFactory()
+ {
+ super();
+ }
+
+ /**
+ * @param id
+ * @param config
+ */
+ public PersistedConnectionFactory(final ConnectionFactoryConfiguration config)
+ {
+ super();
+ this.config = config;
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ /**
+ * @return the id
+ */
+ public long getId()
+ {
+ return id;
+ }
+
+ public void setId(final long id)
+ {
+ this.id = id;
+ }
+
+ public String getName()
+ {
+ return config.getName();
+ }
+
+ /**
+ * @return the config
+ */
+ public ConnectionFactoryConfiguration getConfig()
+ {
+ return config;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(final HornetQBuffer buffer)
+ {
+ config = new ConnectionFactoryConfigurationImpl();
+ config.decode(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ config.encode(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return config.getEncodeSize();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/PersistedDestination.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.persistence;
+
+/**
+ * A PersistedDestination
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PersistedDestination
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/journal/JournalJMSStorageManagerImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.persistence.impl.journal;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.replication.impl.ReplicatedJournal;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.jms.persistence.JMSStorageManager;
+import org.hornetq.jms.persistence.PersistedConnectionFactory;
+import org.hornetq.jms.persistence.PersistedDestination;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.utils.IDGenerator;
+
+/**
+ * A JournalJMSStorageManagerImpl
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalJMSStorageManagerImpl implements JMSStorageManager
+{
+
+ // Constants -----------------------------------------------------
+
+ private final byte CF_RECORD = 1;
+
+ // Attributes ----------------------------------------------------
+
+ private final IDGenerator idGenerator;
+
+ private final String bindingsDir;
+
+ private final boolean createBindingsDir;
+
+ private final Journal jmsJournal;
+
+ private volatile boolean started;
+
+ private Map<String, PersistedConnectionFactory> mapFactories = new ConcurrentHashMap<String, PersistedConnectionFactory>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+ public JournalJMSStorageManagerImpl(final IDGenerator idGenerator,
+ final Configuration config,
+ final ReplicationManager replicator)
+ {
+ if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO)
+ {
+ throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals");
+ }
+
+ bindingsDir = config.getBindingsDirectory();
+
+ if (bindingsDir == null)
+ {
+ throw new NullPointerException("bindings-dir is null");
+ }
+
+ createBindingsDir = config.isCreateBindingsDir();
+
+ SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(bindingsDir);
+
+ Journal localJMS = new JournalImpl(1024 * 1024,
+ 2,
+ config.getJournalCompactMinFiles(),
+ config.getJournalCompactPercentage(),
+ bindingsJMS,
+ "hornetq-jms-config",
+ "jms",
+ 1);
+
+ if (replicator != null)
+ {
+ jmsJournal = new ReplicatedJournal((byte)2, localJMS, replicator);
+ }
+ else
+ {
+ jmsJournal = localJMS;
+ }
+
+ this.idGenerator = idGenerator;
+ }
+
+
+ // Public --------------------------------------------------------
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#recoverConnectionFactories()
+ */
+ public List<PersistedConnectionFactory> recoverConnectionFactories()
+ {
+ List<PersistedConnectionFactory> cfs = new ArrayList<PersistedConnectionFactory>(mapFactories.size());
+ cfs.addAll(mapFactories.values());
+ return cfs;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#storeConnectionFactory(org.hornetq.jms.persistence.PersistedConnectionFactory)
+ */
+ public void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws Exception
+ {
+ deleteConnectionFactory(connectionFactory.getName());
+ long id = idGenerator.generateID();
+ connectionFactory.setId(id);
+ jmsJournal.appendAddRecord(id, CF_RECORD, connectionFactory, true);
+ mapFactories.put(connectionFactory.getName(), connectionFactory);
+ }
+
+ public void deleteConnectionFactory(String cfName) throws Exception
+ {
+ PersistedConnectionFactory oldCF = mapFactories.remove(cfName);
+ if (oldCF != null)
+ {
+ jmsJournal.appendDeleteRecord(oldCF.getId(), false);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#recoverDestinations()
+ */
+ public List<PersistedDestination> recoverDestinations()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#storeDestination(org.hornetq.jms.persistence.PersistedDestination)
+ */
+ public void storeDestination(PersistedDestination destination)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ */
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#start()
+ */
+ public void start() throws Exception
+ {
+
+ checkAndCreateDir(bindingsDir, createBindingsDir);
+
+ jmsJournal.start();
+
+ load();
+
+ started = true;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#stop()
+ */
+ public void stop() throws Exception
+ {
+ jmsJournal.stop();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void load() throws Exception
+ {
+ mapFactories.clear();
+
+ List<RecordInfo> data = new ArrayList<RecordInfo>();
+
+ ArrayList<PreparedTransactionInfo> list = new ArrayList<PreparedTransactionInfo>();
+
+ jmsJournal.load(data, list, null);
+
+ for (RecordInfo record : data)
+ {
+ long id = record.id;
+
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(record.data);
+
+ byte rec = record.getUserRecordType();
+
+ if (rec == CF_RECORD)
+ {
+ PersistedConnectionFactory cf = new PersistedConnectionFactory();
+ cf.decode(buffer);
+ cf.setId(id);
+ mapFactories.put(cf.getName(), cf);
+ }
+ else
+ {
+ throw new IllegalStateException("Invalid record type " + rec);
+ }
+
+ }
+
+ }
+
+ private void checkAndCreateDir(final String dir, final boolean create)
+ {
+ File f = new File(dir);
+
+ if (!f.exists())
+ {
+ if (create)
+ {
+ if (!f.mkdirs())
+ {
+ throw new IllegalStateException("Failed to create directory " + dir);
+ }
+ }
+ else
+ {
+ throw new IllegalArgumentException("Directory " + dir + " does not exist and will not create it");
+ }
+ }
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/persistence/impl/nullpm/NullJMSStorageManagerImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.persistence.impl.nullpm;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.hornetq.jms.persistence.JMSStorageManager;
+import org.hornetq.jms.persistence.PersistedConnectionFactory;
+import org.hornetq.jms.persistence.PersistedDestination;
+
+/**
+ * A NullJMSStorageManagerImpl
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class NullJMSStorageManagerImpl implements JMSStorageManager
+{
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#deleteConnectionFactory(java.lang.String)
+ */
+ public void deleteConnectionFactory(String connectionFactory) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#recoverConnectionFactories()
+ */
+ public List<PersistedConnectionFactory> recoverConnectionFactories()
+ {
+ return Collections.emptyList();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#recoverDestinations()
+ */
+ public List<PersistedDestination> recoverDestinations()
+ {
+ return Collections.emptyList();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#storeConnectionFactory(org.hornetq.jms.persistence.PersistedConnectionFactory)
+ */
+ public void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.persistence.JMSStorageManager#storeDestination(org.hornetq.jms.persistence.PersistedDestination)
+ */
+ public void storeDestination(PersistedDestination destination)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ */
+ public boolean isStarted()
+ {
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#start()
+ */
+ public void start() throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#stop()
+ */
+ public void stop() throws Exception
+ {
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -17,6 +17,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.jms.server.JMSServerManager;
/**
@@ -26,7 +27,7 @@
*
*
*/
-public interface ConnectionFactoryConfiguration
+public interface ConnectionFactoryConfiguration extends EncodingSupport
{
String getName();
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -14,12 +14,16 @@
package org.hornetq.jms.server.config.impl;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.utils.BufferHelper;
+import org.hornetq.utils.DataConstants;
/**
* A ConnectionFactoryConfigurationImpl
@@ -34,10 +38,10 @@
// Attributes ----------------------------------------------------
- private final String[] bindings;
+ private String name;
- private final String name;
-
+ private String[] bindings;
+
private String discoveryGroupName;
private String discoveryAddress;
@@ -45,7 +49,7 @@
private int discoveryPort;
private List<Pair<String, String>> connectorNames;
-
+
private List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs;
private String clientID = null;
@@ -112,6 +116,11 @@
// Constructors --------------------------------------------------
+ /** To be used on persistence only */
+ public ConnectionFactoryConfigurationImpl()
+ {
+ }
+
public ConnectionFactoryConfigurationImpl(final String name,
final String discoveryAddress,
final int discoveryPort,
@@ -527,9 +536,306 @@
public void setDiscoveryGroupName(String groupName)
{
this.discoveryGroupName = groupName;
+
+ }
+
+ // Encoding Support Implementation --------------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ name = buffer.readSimpleString().toString();
+
+ int nbindings = buffer.readInt();
+
+ bindings = new String[nbindings];
+
+ for (int i = 0; i < nbindings; i++)
+ {
+ bindings[i] = buffer.readSimpleString().toString();
+ }
+
+ discoveryGroupName = BufferHelper.readNullableSimpleStringAsString(buffer);
+
+ discoveryAddress = BufferHelper.readNullableSimpleStringAsString(buffer);
+
+ discoveryPort = buffer.readInt();
+
+ int nConnectors = buffer.readInt();
+
+ connectorNames = new ArrayList<Pair<String, String>>(nConnectors);
+
+ for (int i = 0; i < nConnectors; i++)
+ {
+ String a = BufferHelper.readNullableSimpleStringAsString(buffer);
+
+ String b = BufferHelper.readNullableSimpleStringAsString(buffer);
+
+ connectorNames.add(new Pair<String, String>(a, b));
+ }
+
+ clientID = BufferHelper.readNullableSimpleStringAsString(buffer);
+
+ discoveryRefreshTimeout = buffer.readLong();
+
+ clientFailureCheckPeriod = buffer.readLong();
+
+ connectionTTL = buffer.readLong();
+
+ callTimeout = buffer.readLong();
+
+ cacheLargeMessagesClient = buffer.readBoolean();
+
+ minLargeMessageSize = buffer.readInt();
+
+ consumerWindowSize = buffer.readInt();
+
+ consumerMaxRate = buffer.readInt();
+
+ confirmationWindowSize = buffer.readInt();
+
+ producerWindowSize = buffer.readInt();
+
+ producerMaxRate = buffer.readInt();
+
+ blockOnAcknowledge = buffer.readBoolean();
+
+ blockOnDurableSend = buffer.readBoolean();
+
+ blockOnNonDurableSend = buffer.readBoolean();
+
+ autoGroup = buffer.readBoolean();
+
+ preAcknowledge = buffer.readBoolean();
+
+ loadBalancingPolicyClassName = buffer.readSimpleString().toString();
+
+ transactionBatchSize = buffer.readInt();
+
+ dupsOKBatchSize = buffer.readInt();
+
+ initialWaitTimeout = buffer.readLong();
+
+ useGlobalPools = buffer.readBoolean();
+
+ scheduledThreadPoolMaxSize = buffer.readInt();
+
+ threadPoolMaxSize = buffer.readInt();
+
+ retryInterval = buffer.readLong();
+
+ retryIntervalMultiplier = buffer.readDouble();
+
+ maxRetryInterval = buffer.readLong();
+
+ reconnectAttempts = buffer.readInt();
+
+ failoverOnServerShutdown = buffer.readBoolean();
+
+ groupID = BufferHelper.readNullableSimpleStringAsString(buffer);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ BufferHelper.writeAsSimpleString(buffer, name);
+
+ buffer.writeInt(bindings.length);
+
+ for (String str : bindings)
+ {
+ BufferHelper.writeAsSimpleString(buffer, str);
+ }
+
+ BufferHelper.writeAsNullableSimpleString(buffer, discoveryGroupName);
+
+ BufferHelper.writeAsNullableSimpleString(buffer, discoveryAddress);
+
+ buffer.writeInt(discoveryPort);
+
+ buffer.writeInt(connectorNames == null ? 0 : connectorNames.size());
+
+ if (connectorNames != null)
+ {
+ for (Pair<String, String> namePair : connectorNames)
+ {
+ BufferHelper.writeAsNullableSimpleString(buffer, namePair.a);
+ BufferHelper.writeAsNullableSimpleString(buffer, namePair.b);
+ }
+ }
+
+ BufferHelper.writeAsNullableSimpleString(buffer, clientID);
+
+ buffer.writeLong(discoveryRefreshTimeout);
+
+ buffer.writeLong(clientFailureCheckPeriod);
+
+ buffer.writeLong(connectionTTL);
+
+ buffer.writeLong(callTimeout);
+
+ buffer.writeBoolean(cacheLargeMessagesClient);
+
+ buffer.writeInt(minLargeMessageSize);
+
+ buffer.writeInt(consumerWindowSize);
+
+ buffer.writeInt(consumerMaxRate);
+
+ buffer.writeInt(confirmationWindowSize);
+
+ buffer.writeInt(producerWindowSize);
+
+ buffer.writeInt(producerMaxRate);
+
+ buffer.writeBoolean(blockOnAcknowledge);
+
+ buffer.writeBoolean(blockOnDurableSend);
+
+ buffer.writeBoolean(blockOnNonDurableSend);
+
+ buffer.writeBoolean(autoGroup);
+
+ buffer.writeBoolean(preAcknowledge);
+
+ BufferHelper.writeAsSimpleString(buffer, loadBalancingPolicyClassName);
+
+ buffer.writeInt(transactionBatchSize);
+
+ buffer.writeInt(dupsOKBatchSize);
+
+ buffer.writeLong(initialWaitTimeout);
+
+ buffer.writeBoolean(useGlobalPools);
+
+ buffer.writeInt(scheduledThreadPoolMaxSize);
+
+ buffer.writeInt(threadPoolMaxSize);
+
+ buffer.writeLong(retryInterval);
+
+ buffer.writeDouble(retryIntervalMultiplier);
+
+ buffer.writeLong(maxRetryInterval);
+
+ buffer.writeInt(reconnectAttempts);
+
+ buffer.writeBoolean(failoverOnServerShutdown);
+
+ BufferHelper.writeAsNullableSimpleString(buffer, groupID);
+ }
+
+ private int sizeOfBindings()
+ {
+ int size = DataConstants.SIZE_INT; // for the number of bindings persisted
+
+ for (String str : bindings)
+ {
+ size += BufferHelper.sizeOfSimpleString(str);
+ }
+
+ return size;
+
+ }
+
+ private int sizeOfConnectors()
+ {
+ int size = DataConstants.SIZE_INT; // for the number of connectors persisted
+
+ if (connectorNames != null)
+ {
+ for (Pair<String, String> pair : connectorNames)
+ {
+ size += BufferHelper.sizeOfNullableSimpleString(pair.a);
+ size += BufferHelper.sizeOfNullableSimpleString(pair.b);
+ }
+ }
+
+ return size;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return BufferHelper.sizeOfSimpleString(name) +
+ sizeOfBindings() +
+
+ BufferHelper.sizeOfNullableSimpleString(discoveryGroupName) +
+
+ BufferHelper.sizeOfNullableSimpleString(discoveryAddress)+
+
+ DataConstants.SIZE_INT + // discoveryPort
+
+ sizeOfConnectors() +
+
+ BufferHelper.sizeOfNullableSimpleString(clientID) +
+
+ DataConstants.SIZE_LONG + // discoveryRefreshTimeout
+
+ DataConstants.SIZE_LONG + // clientFailureCheckPeriod
+
+ DataConstants.SIZE_LONG + // connectionTTL
+
+ DataConstants.SIZE_LONG + // callTimeout
+
+ DataConstants.SIZE_BOOLEAN + // cacheLargeMessagesClient
+
+ DataConstants.SIZE_INT + // minLargeMessageSize
+
+ DataConstants.SIZE_INT + // consumerWindowSize
+
+ DataConstants.SIZE_INT + // consumerMaxRate
+
+ DataConstants.SIZE_INT + // confirmationWindowSize
+
+ DataConstants.SIZE_INT + // producerWindowSize
+
+ DataConstants.SIZE_INT + // producerMaxRate
+
+ DataConstants.SIZE_BOOLEAN + // blockOnAcknowledge
+
+ DataConstants.SIZE_BOOLEAN + // blockOnDurableSend
+
+ DataConstants.SIZE_BOOLEAN + // blockOnNonDurableSend
+
+ DataConstants.SIZE_BOOLEAN + // autoGroup
+
+ DataConstants.SIZE_BOOLEAN + // preAcknowledge
+
+ BufferHelper.sizeOfSimpleString(loadBalancingPolicyClassName) +
+
+ DataConstants.SIZE_INT + // transactionBatchSize
+
+ DataConstants.SIZE_INT + // dupsOKBatchSize
+
+ DataConstants.SIZE_LONG + // initialWaitTimeout
+
+ DataConstants.SIZE_BOOLEAN + // useGlobalPools
+
+ DataConstants.SIZE_INT + // scheduledThreadPoolMaxSize
+
+ DataConstants.SIZE_INT + // threadPoolMaxSize
+
+ DataConstants.SIZE_LONG + // retryInterval
+
+ DataConstants.SIZE_DOUBLE + // retryIntervalMultiplier
+
+ DataConstants.SIZE_LONG + // maxRetryInterval
+
+ DataConstants.SIZE_INT + // reconnectAttempts
+
+ DataConstants.SIZE_BOOLEAN + // failoverOnServerShutdown
+
+ BufferHelper.sizeOfNullableSimpleString(groupID);
}
-
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -42,6 +42,10 @@
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.SelectorTranslator;
+import org.hornetq.jms.persistence.JMSStorageManager;
+import org.hornetq.jms.persistence.PersistedConnectionFactory;
+import org.hornetq.jms.persistence.impl.journal.JournalJMSStorageManagerImpl;
+import org.hornetq.jms.persistence.impl.nullpm.NullJMSStorageManagerImpl;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.JMSConfiguration;
@@ -49,6 +53,7 @@
import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.management.JMSManagementService;
import org.hornetq.jms.server.management.impl.JMSManagementServiceImpl;
+import org.hornetq.utils.TimeAndCounterIDGenerator;
/**
* A Deployer used to create and add to JNDI queues, topics and connection
@@ -100,11 +105,15 @@
private boolean contextSet;
private JMSConfiguration config;
+
+ private Configuration coreConfig;
+
+ private JMSStorageManager storage;
public JMSServerManagerImpl(final HornetQServer server) throws Exception
{
this.server = server;
-
+
configFileName = null;
}
@@ -157,6 +166,8 @@
{
deploy();
}
+
+ initJournal();
}
catch (Exception e)
{
@@ -601,7 +612,17 @@
public synchronized void createConnectionFactory(final ConnectionFactoryConfiguration cfConfig) throws Exception
{
+ internalCreateCF(cfConfig);
+ storage.storeConnectionFactory(new PersistedConnectionFactory(cfConfig));
+ }
+ /**
+ * @param cfConfig
+ * @throws HornetQException
+ * @throws Exception
+ */
+ private void internalCreateCF(final ConnectionFactoryConfiguration cfConfig) throws HornetQException, Exception
+ {
ArrayList<String> listBindings = new ArrayList<String>();
for (String str : cfConfig.getBindings())
{
@@ -683,7 +704,6 @@
cfConfig.getGroupID(),
listBindings);
}
-
}
public synchronized void createConnectionFactory(final String name,
@@ -950,6 +970,35 @@
}
/**
+ * @param server
+ */
+ private void initJournal() throws Exception
+ {
+ this.coreConfig = server.getConfiguration();
+
+ if (coreConfig.isPersistenceEnabled())
+ {
+ // TODO: replication
+ storage = new JournalJMSStorageManagerImpl(new TimeAndCounterIDGenerator(), coreConfig, null);
+ }
+ else
+ {
+ storage = new NullJMSStorageManagerImpl();
+ }
+
+ storage.start();
+
+
+ List<PersistedConnectionFactory> cfs = storage.recoverConnectionFactories();
+
+ for (PersistedConnectionFactory cf : cfs)
+ {
+ internalCreateCF(cf.getConfig());
+ }
+ }
+
+
+ /**
* @param cfConfig
* @return
* @throws HornetQException
Added: branches/Clebert_TMP/src/main/org/hornetq/utils/BufferHelper.java
===================================================================
--- branches/Clebert_TMP/src/main/org/hornetq/utils/BufferHelper.java (rev 0)
+++ branches/Clebert_TMP/src/main/org/hornetq/utils/BufferHelper.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+
+/**
+ * Helper methods to read and write from HornetQBuffer.
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class BufferHelper
+{
+
+ /** Size of a String as if it was a Nullable Simple String */
+ public static int sizeOfNullableSimpleString(String str)
+ {
+ if (str == null)
+ {
+ return DataConstants.SIZE_BOOLEAN;
+ }
+ else
+ {
+ return DataConstants.SIZE_BOOLEAN + sizeOfSimpleString(str);
+ }
+ }
+
+ /** Size of a String as it if was a Simple String*/
+ public static int sizeOfSimpleString(String str)
+ {
+ return DataConstants.SIZE_INT + str.length() * 2;
+ }
+
+ public static void writeAsNullableSimpleString(HornetQBuffer buffer, String str)
+ {
+ buffer.writeNullableSimpleString(SimpleString.toSimpleString(str));
+ }
+
+ public static String readNullableSimpleStringAsString(HornetQBuffer buffer)
+ {
+ SimpleString str = buffer.readNullableSimpleString();
+ return str != null ? str.toString() : null;
+ }
+
+ public static void writeAsSimpleString(HornetQBuffer buffer, String str)
+ {
+ buffer.writeSimpleString(new SimpleString(str));
+ }
+
+ /**
+ * @param buffer
+ */
+ public static void writeNullableBoolean(HornetQBuffer buffer, Boolean value)
+ {
+ buffer.writeBoolean(value != null);
+
+ if (value != null)
+ {
+ buffer.writeBoolean(value.booleanValue());
+ }
+ }
+
+ public static int sizeOfNullableBoolean(Boolean value)
+ {
+ return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_BOOLEAN : 0);
+ }
+
+ public static Boolean readNullableBoolean(HornetQBuffer buffer)
+ {
+ boolean isNotNull = buffer.readBoolean();
+
+ if (isNotNull)
+ {
+ return buffer.readBoolean();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /**
+ * @param buffer
+ */
+ public static void writeNullableLong(HornetQBuffer buffer, Long value)
+ {
+ buffer.writeBoolean(value != null);
+
+ if (value != null)
+ {
+ buffer.writeLong(value.longValue());
+ }
+ }
+
+ public static int sizeOfNullableLong(Long value)
+ {
+ return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_LONG : 0);
+ }
+
+ public static Long readNullableLong(HornetQBuffer buffer)
+ {
+ boolean isNotNull = buffer.readBoolean();
+
+ if (isNotNull)
+ {
+ return buffer.readLong();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /**
+ * @param buffer
+ */
+ public static void writeNullableInteger(HornetQBuffer buffer, Integer value)
+ {
+ buffer.writeBoolean(value != null);
+
+ if (value != null)
+ {
+ buffer.writeInt(value.intValue());
+ }
+ }
+
+ public static int sizeOfNullableInteger(Integer value)
+ {
+ return DataConstants.SIZE_BOOLEAN + (value != null ? DataConstants.SIZE_INT : 0);
+ }
+
+ public static Integer readNullableInteger(HornetQBuffer buffer)
+ {
+ boolean isNotNull = buffer.readBoolean();
+
+ if (isNotNull)
+ {
+ return buffer.readInt();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+}
Added: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/AddressSettingsConfigurationStorageTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/AddressSettingsConfigurationStorageTest.java (rev 0)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/AddressSettingsConfigurationStorageTest.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+
+/**
+ * A ConfigurationStorageTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class AddressSettingsConfigurationStorageTest extends StorageManagerTestBase
+{
+
+ private Map<SimpleString, PersistedAddressSetting> mapExpectedAddresses;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ mapExpectedAddresses = new HashMap<SimpleString, PersistedAddressSetting>();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ mapExpectedAddresses = null;
+
+ super.tearDown();
+ }
+
+ protected void addAddress(JournalStorageManager journal, String address, AddressSettings setting) throws Exception
+ {
+ SimpleString str = new SimpleString(address);
+ PersistedAddressSetting persistedSetting = new PersistedAddressSetting(str, setting);
+ mapExpectedAddresses.put(str, persistedSetting);
+ journal.storeAddressSetting(persistedSetting);
+ }
+
+ public void testStoreSecuritySettings() throws Exception
+ {
+ createStorage();
+
+ AddressSettings setting = new AddressSettings();
+
+ setting = new AddressSettings();
+
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+
+ setting.setDeadLetterAddress(new SimpleString("some-test"));
+
+ addAddress(journal, "a2", setting);
+
+ journal.stop();
+
+ createStorage();
+
+ checkAddresses(journal);
+
+ setting = new AddressSettings();
+
+ setting.setDeadLetterAddress(new SimpleString("new-adddress"));
+
+ // Replacing the first setting
+ addAddress(journal, "a1", setting);
+
+ journal.stop();
+
+ createStorage();
+
+ checkAddresses(journal);
+
+ journal.stop();
+
+ journal = null;
+
+ }
+
+ /**
+ * @param journal
+ * @throws Exception
+ */
+ private void checkAddresses(JournalStorageManager journal) throws Exception
+ {
+ List<PersistedAddressSetting> listSetting = journal.recoverAddressSettings();
+
+ assertEquals(mapExpectedAddresses.size(), listSetting.size());
+
+ for (PersistedAddressSetting el : listSetting)
+ {
+ PersistedAddressSetting el2 = mapExpectedAddresses.get(el.getAddressMatch());
+
+ assertEquals(el.getAddressMatch(), el2.getAddressMatch());
+ assertEquals(el.getSetting(), el2.getSetting());
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/JMSConnectionFactoryConfigurationStorageTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/JMSConnectionFactoryConfigurationStorageTest.java (rev 0)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/JMSConnectionFactoryConfigurationStorageTest.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.jms.persistence.PersistedConnectionFactory;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
+
+/**
+ * A JMSConnectionFactoryConfigurationStorageTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JMSConnectionFactoryConfigurationStorageTest extends StorageManagerTestBase
+{
+
+ private Map<String, PersistedConnectionFactory> mapExpectedCFs;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ mapExpectedCFs = new HashMap<String, PersistedConnectionFactory>();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ mapExpectedCFs = null;
+
+ super.tearDown();
+ }
+
+ protected void addSetting(PersistedConnectionFactory setting) throws Exception
+ {
+ mapExpectedCFs.put(setting.getName(), setting);
+ jmsJournal.storeConnectionFactory(setting);
+ }
+
+ public void testSettings() throws Exception
+ {
+
+ createJMSStorage();
+
+ String str[] = new String[5];
+ for (int i = 0; i < 5; i++)
+ {
+ str[i] = "str" + i;
+ }
+
+ ConnectionFactoryConfiguration config = new ConnectionFactoryConfigurationImpl("some-name", str);
+
+ addSetting(new PersistedConnectionFactory(config));
+
+ jmsJournal.stop();
+
+ createJMSStorage();
+
+ List<PersistedConnectionFactory> cfs = jmsJournal.recoverConnectionFactories();
+
+ assertEquals(1, cfs.size());
+
+ assertEquals("some-name", cfs.get(0).getName());
+
+ assertEquals(5, cfs.get(0).getConfig().getBindings().length);
+
+ for (int i = 0; i < 5; i++)
+ {
+ assertEquals("str" + i, cfs.get(0).getConfig().getBindings()[i]);
+ }
+ }
+
+ public void testSizeOfCF() throws Exception
+ {
+
+ String str[] = new String[5];
+ for (int i = 0; i < 5; i++)
+ {
+ str[i] = "str" + i;
+ }
+
+ ConnectionFactoryConfiguration config = new ConnectionFactoryConfigurationImpl("some-name", str);
+
+ int size = config.getEncodeSize();
+
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(size);
+
+ config.encode(buffer);
+
+ assertEquals(size, buffer.writerIndex());
+
+ PersistedConnectionFactory persistedCF = new PersistedConnectionFactory(config);
+
+ size = persistedCF.getEncodeSize();
+
+ buffer = HornetQBuffers.fixedBuffer(size);
+
+ persistedCF.encode(buffer);
+
+ assertEquals(size, buffer.writerIndex());
+
+ }
+
+ /**
+ * @param journal
+ * @throws Exception
+ */
+ private void checkSettings() throws Exception
+ {
+ List<PersistedConnectionFactory> listSetting = jmsJournal.recoverConnectionFactories();
+
+ assertEquals(mapExpectedCFs.size(), listSetting.size());
+
+ for (PersistedConnectionFactory el : listSetting)
+ {
+ PersistedConnectionFactory el2 = mapExpectedCFs.get(el.getName());
+
+ assertEquals(el, el2);
+ }
+ }
+
+}
Added: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/RolesConfigurationStorageTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/RolesConfigurationStorageTest.java (rev 0)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/RolesConfigurationStorageTest.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.persistconfig.PersistedRoles;
+
+/**
+ * A ConfigurationStorageTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class RolesConfigurationStorageTest extends StorageManagerTestBase
+{
+
+ private Map<SimpleString, PersistedRoles> mapExpectedSets;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ mapExpectedSets = new HashMap<SimpleString, PersistedRoles>();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ mapExpectedSets = null;
+
+ super.tearDown();
+ }
+
+ protected void addSetting(PersistedRoles setting) throws Exception
+ {
+ mapExpectedSets.put(setting.getAddressMatch(), setting);
+ journal.storeSecurityRoles(setting);
+ }
+
+ public void testStoreSecuritySettings() throws Exception
+ {
+ createStorage();
+
+
+ addSetting(new PersistedRoles("a#",
+ "a1",
+ "a1",
+ "a1",
+ "a1",
+ "a1",
+ "a1",
+ "a1"));
+
+
+ addSetting(new PersistedRoles("a2",
+ "a1",
+ null,
+ "a1",
+ "a1",
+ "a1",
+ "a1",
+ "a1"));
+
+ journal.stop();
+
+ checkSettings();
+
+ createStorage();
+
+ checkSettings();
+
+ addSetting(new PersistedRoles("a2",
+ "a1",
+ null,
+ "a1",
+ "a1",
+ "a1",
+ "a1",
+ "a1"));
+
+ addSetting(new PersistedRoles("a3",
+ "a1",
+ null,
+ "a1",
+ "a1",
+ "a1",
+ "a1",
+ "a1"));
+
+ checkSettings();
+
+ journal.stop();
+
+ createStorage();
+
+ checkSettings();
+
+ journal.stop();
+
+ journal = null;
+
+ }
+
+ /**
+ * @param journal
+ * @throws Exception
+ */
+ private void checkSettings() throws Exception
+ {
+ List<PersistedRoles> listSetting = journal.recoverPersistedRoles();
+
+ assertEquals(mapExpectedSets.size(), listSetting.size());
+
+ for (PersistedRoles el : listSetting)
+ {
+ PersistedRoles el2 = mapExpectedSets.get(el.getAddressMatch());
+
+ assertEquals(el, el2);
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java (rev 0)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.QueueBindingInfo;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.Queue;
+import org.hornetq.jms.persistence.JMSStorageManager;
+import org.hornetq.jms.persistence.impl.journal.JournalJMSStorageManagerImpl;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
+import org.hornetq.utils.TimeAndCounterIDGenerator;
+
+/**
+ * A StorageManagerTestBase
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class StorageManagerTestBase extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ protected ExecutorService executor;
+
+ protected ExecutorFactory execFactory;
+
+ protected JournalStorageManager journal;
+
+ protected JMSStorageManager jmsJournal;
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ executor = Executors.newCachedThreadPool();
+
+ execFactory = new OrderedExecutorFactory(executor);
+
+ File testdir = new File(getTestDir());
+
+ deleteDirectory(testdir);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ executor.shutdown();
+
+ if (journal != null)
+ {
+ try
+ {
+ journal.stop();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(); // >> junit report
+ }
+
+ journal = null;
+ }
+
+ if (jmsJournal != null)
+ {
+ try
+ {
+ jmsJournal.stop();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(); // >> junit report
+ }
+
+ jmsJournal = null;
+ }
+
+ super.tearDown();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ protected void createStorage() throws Exception
+ {
+ Configuration configuration = createDefaultConfig();
+
+ configuration.setJournalType(JournalType.ASYNCIO);
+
+ journal = new JournalStorageManager(configuration, execFactory);
+
+ journal.start();
+
+ journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
+
+ Map<Long, Queue> queues = new HashMap<Long, Queue>();
+
+ journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null);
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ protected void createJMSStorage() throws Exception
+ {
+ Configuration configuration = createDefaultConfig();
+
+ configuration.setJournalType(JournalType.ASYNCIO);
+
+ jmsJournal = new JournalJMSStorageManagerImpl(new TimeAndCounterIDGenerator(), configuration, null);
+
+ jmsJournal.start();
+ }
+
+
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Clebert_TMP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Clebert_TMP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-03-25 05:53:03 UTC (rev 8960)
+++ branches/Clebert_TMP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-03-25 05:56:22 UTC (rev 8961)
@@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -48,6 +49,8 @@
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
+import org.hornetq.core.persistconfig.PersistedAddressSetting;
+import org.hornetq.core.persistconfig.PersistedRoles;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
@@ -1307,10 +1310,52 @@
public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
{
- // TODO Auto-generated method stub
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#recoverAddressSettings()
+ */
+ public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
+ {
+ return Collections.emptyList();
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#recoverPersistedRoles()
+ */
+ public List<PersistedRoles> recoverPersistedRoles() throws Exception
+ {
+ return Collections.emptyList();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeAddressSetting(org.hornetq.core.persistconfig.PersistedAddressSetting)
+ */
+ public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#storeSecurityRoles(org.hornetq.core.persistconfig.PersistedRoles)
+ */
+ public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteAddressSetting(org.hornetq.api.core.SimpleString)
+ */
+ public void deleteAddressSetting(SimpleString addressMatch) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deleteSecurityRoles(org.hornetq.api.core.SimpleString)
+ */
+ public void deleteSecurityRoles(SimpleString addressMatch) throws Exception
+ {
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
More information about the hornetq-commits
mailing list