[jboss-cvs] JBoss Messaging SVN: r6036 - in trunk: src/main/org/jboss/messaging/core/config and 11 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Mar 7 10:30:27 EST 2009
Author: timfox
Date: 2009-03-07 10:30:27 -0500 (Sat, 07 Mar 2009)
New Revision: 6036
Modified:
trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java
trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverExpiredMessageTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java
Log:
various fixes etc
Modified: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -22,9 +22,7 @@
package org.jboss.messaging.core.cluster.impl;
-import java.io.ByteArrayInputStream;
import java.io.InterruptedIOException;
-import java.io.ObjectInputStream;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
@@ -34,10 +32,12 @@
import java.util.List;
import java.util.Map;
+import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.cluster.DiscoveryGroup;
import org.jboss.messaging.core.cluster.DiscoveryListener;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.Pair;
/**
@@ -223,20 +223,18 @@
continue;
}
}
+
+ MessagingBuffer buffer = ChannelBuffers.wrappedBuffer(data);
+
+ String originatingNodeID = buffer.readString();
- ByteArrayInputStream bis = new ByteArrayInputStream(data);
-
- ObjectInputStream ois = new ObjectInputStream(bis);
-
- String originatingNodeID = ois.readUTF();
-
if (nodeID.equals(originatingNodeID))
{
// Ignore traffic from own node
continue;
}
- int size = ois.readInt();
+ int size = buffer.readInt();
boolean changed = false;
@@ -244,15 +242,19 @@
{
for (int i = 0; i < size; i++)
{
- TransportConfiguration connector = (TransportConfiguration)ois.readObject();
+ TransportConfiguration connector = new TransportConfiguration();
+
+ connector.decode(buffer);
- boolean existsBackup = ois.readBoolean();
+ boolean existsBackup = buffer.readBoolean();
TransportConfiguration backupConnector = null;
if (existsBackup)
{
- backupConnector = (TransportConfiguration)ois.readObject();
+ backupConnector = new TransportConfiguration();
+
+ backupConnector.decode(buffer);
}
Pair<TransportConfiguration, TransportConfiguration> connectorPair = new Pair<TransportConfiguration, TransportConfiguration>(connector,
Modified: trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -22,9 +22,12 @@
package org.jboss.messaging.core.config;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
/**
* A TransportConfiguration
*
@@ -35,73 +38,191 @@
{
private static final long serialVersionUID = -3994528421527392679L;
- private final String name;
-
- private final String factoryClassName;
-
- private final Map<String, Object> params;
-
+ private String name;
+
+ private String factoryClassName;
+
+ private Map<String, Object> params;
+
+ private static final byte TYPE_BOOLEAN = 0;
+
+ private static final byte TYPE_INT = 1;
+
+ private static final byte TYPE_LONG = 2;
+
+ private static final byte TYPE_STRING = 3;
+
+ public void encode(final MessagingBuffer buffer)
+ {
+ buffer.writeString(name);
+ buffer.writeString(factoryClassName);
+
+ buffer.writeInt(params == null ? 0 : params.size());
+
+ if (params != null)
+ {
+ for (Map.Entry<String, Object> entry : params.entrySet())
+ {
+ buffer.writeString(entry.getKey());
+
+ Object val = entry.getValue();
+
+ if (val instanceof Boolean)
+ {
+ buffer.writeByte(TYPE_BOOLEAN);
+ buffer.writeBoolean((Boolean)val);
+ }
+ else if (val instanceof Integer)
+ {
+ buffer.writeByte(TYPE_INT);
+ buffer.writeInt((Integer)val);
+ }
+ else if (val instanceof Long)
+ {
+ buffer.writeByte(TYPE_LONG);
+ buffer.writeLong((Long)val);
+ }
+ else if (val instanceof String)
+ {
+ buffer.writeByte(TYPE_STRING);
+ buffer.writeString((String)val);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid type " + val);
+ }
+ }
+ }
+ }
+
+ public void decode(final MessagingBuffer buffer)
+ {
+ name = buffer.readString();
+ factoryClassName = buffer.readString();
+
+ int num = buffer.readInt();
+
+ if (params == null)
+ {
+ if (num > 0)
+ {
+ params = new HashMap<String, Object>();
+ }
+ }
+ else
+ {
+ params.clear();
+ }
+
+ for (int i = 0; i < num; i++)
+ {
+ String key = buffer.readString();
+
+ byte type = buffer.readByte();
+
+ Object val;
+
+ switch (type)
+ {
+ case TYPE_BOOLEAN:
+ {
+ val = buffer.readBoolean();
+
+ break;
+ }
+ case TYPE_INT:
+ {
+ val = buffer.readInt();
+
+ break;
+ }
+ case TYPE_LONG:
+ {
+ val = buffer.readLong();
+
+ break;
+ }
+ case TYPE_STRING:
+ {
+ val = buffer.readString();
+
+ break;
+ }
+ default:
+ {
+ throw new IllegalArgumentException("Invalid type " + type);
+ }
+ }
+
+ params.put(key, val);
+ }
+ }
+
public static String[] splitHosts(final String commaSeparatedHosts)
- {
+ {
if (commaSeparatedHosts == null)
{
return new String[0];
}
String[] hosts = commaSeparatedHosts.split(",");
-
+
for (int i = 0; i < hosts.length; i++)
{
- hosts[i] = hosts[i].trim();
+ hosts[i] = hosts[i].trim();
}
- return hosts;
+ return hosts;
}
-
+
+ public TransportConfiguration()
+ {
+ }
+
public TransportConfiguration(final String className, final Map<String, Object> params, final String name)
{
this.factoryClassName = className;
-
+
this.params = params;
-
+
this.name = name;
}
-
+
public TransportConfiguration(final String className, final Map<String, Object> params)
{
this(className, params, UUID.randomUUID().toString());
}
-
+
public TransportConfiguration(final String className)
{
this(className, null, UUID.randomUUID().toString());
}
-
+
public String getName()
{
return name;
}
-
+
public String getFactoryClassName()
{
return factoryClassName;
}
-
+
public Map<String, Object> getParams()
{
return params;
}
-
+
public int hashCode()
{
return factoryClassName.hashCode();
}
-
+
public boolean equals(final Object other)
{
if (other instanceof TransportConfiguration == false)
{
return false;
}
-
+
TransportConfiguration kother = (TransportConfiguration)other;
if (factoryClassName.equals(kother.factoryClassName))
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -189,7 +189,17 @@
// Injecting the postoffice (itself) on queueFactory for paging-control
queueFactory.setPostOffice(this);
-
+
+ if (!backup)
+ {
+ startExpiryScanner();
+ }
+
+ started = true;
+ }
+
+ private void startExpiryScanner()
+ {
if (messageExpiryScanPeriod > 0)
{
MessageExpiryRunner messageExpiryRunner = new MessageExpiryRunner();
@@ -201,7 +211,6 @@
messageExpiryScanPeriod,
TimeUnit.MILLISECONDS);
}
- started = true;
}
public void stop() throws Exception
@@ -669,6 +678,8 @@
}
}
}
+
+ startExpiryScanner();
return queues;
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -358,6 +358,7 @@
public String getRemoteAddress()
{
+ log.info("transport connection is " + transportConnection);
return transportConnection.getRemoteAddress();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -147,6 +147,10 @@
public static final byte SESS_REPLICATE_DELIVERY = 91;
+ public static final byte REPLICATE_UPDATE_CONNECTORS = 92;
+
+ public static final byte REPLICATE_NOTIFICATION = 93;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -16,6 +16,7 @@
import java.util.Set;
import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.management.ManagementService;
import org.jboss.messaging.core.management.MessagingServerControlMBean;
import org.jboss.messaging.core.persistence.StorageManager;
@@ -32,6 +33,7 @@
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.SimpleString;
import org.jboss.messaging.utils.UUID;
@@ -97,6 +99,8 @@
boolean xa,
int sendWindowSize) throws Exception;
+ void updateClusterConnectionConnectors(SimpleString clusterConnectionName, List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception;
+
void removeSession(String name) throws Exception;
ServerSession getSession(String name);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -23,7 +23,11 @@
package org.jboss.messaging.core.server.cluster;
+import java.util.List;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.SimpleString;
/**
@@ -38,4 +42,6 @@
public interface ClusterConnection extends MessagingComponent
{
SimpleString getName();
+
+ void handleReplicatedUpdateConnectors(List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -26,6 +26,7 @@
import java.util.Set;
import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.utils.SimpleString;
/**
* A ClusterManager
@@ -38,6 +39,8 @@
public interface ClusterManager extends MessagingComponent
{
Map<String, Bridge> getBridges();
-
+
Set<ClusterConnection> getClusterConnections();
+
+ ClusterConnection getClusterConnection(SimpleString name);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -23,7 +23,6 @@
package org.jboss.messaging.core.server.cluster.impl;
import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
@@ -31,8 +30,10 @@
import java.util.List;
import java.util.concurrent.ScheduledFuture;
+import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.cluster.BroadcastGroup;
import org.jboss.messaging.utils.Pair;
@@ -94,11 +95,12 @@
if (localPort != -1)
{
socket = new DatagramSocket(localPort);
- } else
+ }
+ else
{
- socket = new DatagramSocket();
+ socket = new DatagramSocket();
}
-
+
started = true;
}
@@ -146,36 +148,30 @@
public synchronized void broadcastConnectors() throws Exception
{
- // TODO - for now we just use plain serialization to serialize the transport configs
- // we should use our own format for a tighter representation
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ MessagingBuffer buff = ChannelBuffers.dynamicBuffer(4096);
+
+ buff.writeString(nodeID);
- ObjectOutputStream oos = new ObjectOutputStream(bos);
-
- oos.writeUTF(nodeID);
+ buff.writeInt(connectorPairs.size());
- oos.writeInt(connectorPairs.size());
-
for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectorPairs)
{
- oos.writeObject(connectorPair.a);
+ connectorPair.a.encode(buff);
if (connectorPair.b != null)
{
- oos.writeBoolean(true);
+ buff.writeBoolean(true);
- oos.writeObject(connectorPair.b);
+ connectorPair.b.encode(buff);
}
else
{
- oos.writeBoolean(false);
+ buff.writeBoolean(false);
}
}
-
- oos.flush();
-
- byte[] data = bos.toByteArray();
-
+
+ byte[] data = buff.array();
+
DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);
socket.send(packet);
@@ -194,13 +190,13 @@
}
catch (Exception e)
{
- log.error("Failed to broadcast connector configs");
+ log.error("Failed to broadcast connector configs", e);
}
}
public synchronized void setScheduledFuture(final ScheduledFuture<?> future)
{
this.future = future;
- }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -46,6 +46,9 @@
import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateClusterConnectionUpdate;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.cluster.Bridge;
@@ -106,6 +109,10 @@
private final int maxHops;
private final UUID nodeUUID;
+
+ private final Channel replicatingChannel;
+
+ private boolean backup;
private volatile boolean started;
@@ -128,7 +135,9 @@
final QueueFactory queueFactory,
final List<Pair<TransportConfiguration, TransportConfiguration>> connectors,
final int maxHops,
- final UUID nodeUUID) throws Exception
+ final UUID nodeUUID,
+ final Channel replicatingChannel,
+ final boolean backup) throws Exception
{
this.name = name;
@@ -163,6 +172,10 @@
this.maxHops = maxHops;
this.nodeUUID = nodeUUID;
+
+ this.replicatingChannel = replicatingChannel;
+
+ this.backup = backup;
this.updateConnectors(connectors);
}
@@ -186,7 +199,9 @@
final QueueFactory queueFactory,
final DiscoveryGroup discoveryGroup,
final int maxHops,
- final UUID nodeUUID) throws Exception
+ final UUID nodeUUID,
+ final Channel replicatingChannel,
+ final boolean backup) throws Exception
{
this.name = name;
@@ -221,6 +236,10 @@
this.maxHops = maxHops;
this.nodeUUID = nodeUUID;
+
+ this.replicatingChannel = replicatingChannel;
+
+ this.backup = backup;
}
public synchronized void start() throws Exception
@@ -267,7 +286,17 @@
{
return name;
}
-
+
+ public synchronized void handleReplicatedUpdateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+ {
+ if (!backup)
+ {
+ return;
+ }
+
+ updateConnectors(connectors);
+ }
+
// DiscoveryListener implementation ------------------------------------------------------------------
public synchronized void connectorsChanged()
@@ -283,9 +312,38 @@
log.error("Failed to update connectors", e);
}
}
-
+
private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
{
+ if (replicatingChannel == null)
+ {
+ doUpdateConnectors(connectors);
+ }
+ else
+ {
+ Packet packet = new ReplicateClusterConnectionUpdate(name, connectors);
+
+ Runnable action = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ doUpdateConnectors(connectors);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to update connectors", e);
+ }
+ }
+ };
+
+ replicatingChannel.replicatePacket(packet, 1, action);
+ }
+ }
+
+ private void doUpdateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+ {
Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
connectorSet.addAll(connectors);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -207,6 +207,11 @@
{
return new HashSet<ClusterConnection>(clusters.values());
}
+
+ public ClusterConnection getClusterConnection(final SimpleString name)
+ {
+ return clusters.get(name.toString());
+ }
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
{
@@ -475,7 +480,9 @@
queueFactory,
connectors,
config.getMaxHops(),
- nodeUUID);
+ nodeUUID,
+ null,
+ false);
}
else
{
@@ -503,7 +510,9 @@
queueFactory,
dg,
config.getMaxHops(),
- nodeUUID);
+ nodeUUID,
+ null,
+ false);
}
managementService.registerCluster(clusterConnection, config);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -67,6 +67,7 @@
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.server.cluster.ClusterConnection;
import org.jboss.messaging.core.server.cluster.ClusterManager;
import org.jboss.messaging.core.server.cluster.Transformer;
import org.jboss.messaging.core.server.cluster.impl.ClusterManagerImpl;
@@ -716,6 +717,19 @@
return sessions.get(name);
}
+ public void updateClusterConnectionConnectors(final SimpleString clusterConnectionName,
+ final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+ {
+ ClusterConnection cc = clusterManager.getClusterConnection(clusterConnectionName);
+
+ if (cc == null)
+ {
+ throw new IllegalStateException("Cannot find cluster connection with name " + clusterConnectionName);
+ }
+
+ cc.handleReplicatedUpdateConnectors(connectors);
+ }
+
public List<ServerSession> getSessions(final String connectionID)
{
Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();
@@ -1023,7 +1037,7 @@
return new CreateSessionResponseMessage(version.getIncrementingVersion());
}
-
+
private Transformer instantiateTransformer(final String transformerClassName)
{
Transformer transformer = null;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -15,6 +15,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_CREATESESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_UPDATE_CONNECTORS;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
@@ -26,6 +27,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReplicateCreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateClusterConnectionUpdate;
import org.jboss.messaging.core.server.MessagingServer;
/**
@@ -88,6 +90,15 @@
break;
}
+ case REPLICATE_UPDATE_CONNECTORS:
+ {
+ ReplicateClusterConnectionUpdate request = (ReplicateClusterConnectionUpdate)packet;
+
+ handleClusterConnectionUpdate(request);
+
+ break;
+
+ }
default:
{
log.error("Invalid packet " + packet);
@@ -227,5 +238,17 @@
channel1.send(response);
}
+
+ private void handleClusterConnectionUpdate(final ReplicateClusterConnectionUpdate request)
+ {
+ try
+ {
+ server.updateClusterConnectionConnectors(request.getClusterConnectionName(), request.getConnectors());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle cluster connection update", e);
+ }
+ }
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -81,6 +81,7 @@
if (active)
{
listener.connectionDestroyed(e.getChannel().getId());
+
active = false;
}
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -22,6 +22,19 @@
package org.jboss.messaging.integration.transports.netty;
+import static org.jboss.netty.channel.Channels.pipeline;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Timer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.net.ssl.SSLContext;
+
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
@@ -42,30 +55,16 @@
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
-import static org.jboss.netty.channel.Channels.pipeline;
-
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalAddress;
-import org.jboss.netty.channel.local.LocalServerChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.ssl.SslHandler;
-import javax.net.ssl.SSLContext;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Map;
-import java.util.Timer;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
/**
* A Netty TCP Acceptor that supports SSL
*
@@ -271,7 +270,7 @@
}
ChannelPipelineSupport.addCodecFilter(pipeline, handler);
- pipeline.addLast("handler", new MessagingServerChannelHandler(channelGroup, handler, listener));
+ pipeline.addLast("handler", new MessagingServerChannelHandler(channelGroup, handler, new Listener()));
return pipeline;
}
};
@@ -355,7 +354,7 @@
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
{
- final Connection tc = new NettyConnection(e.getChannel(), new Listener());
+ new NettyConnection(e.getChannel(), new Listener());
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
if (sslHandler != null)
@@ -366,7 +365,6 @@
{
if (future.isSuccess())
{
- listener.connectionCreated(tc);
active = true;
}
else
@@ -378,7 +376,6 @@
}
else
{
- listener.connectionCreated(tc);
active = true;
}
}
@@ -392,13 +389,22 @@
{
throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
}
+
+ listener.connectionCreated(connection);
}
public void connectionDestroyed(final Object connectionID)
{
if (connections.remove(connectionID) != null)
{
- listener.connectionDestroyed(connectionID);
+ //Execute on different thread to avoid deadlocks
+ new Thread()
+ {
+ public void run()
+ {
+ listener.connectionDestroyed(connectionID);
+ }
+ }.start();
}
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -130,6 +130,8 @@
public String getRemoteAddress()
{
+ log.info("channel is " + channel);
+ log.info("channel remote address " + channel.getRemoteAddress());
return channel.getRemoteAddress().toString();
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -21,6 +21,24 @@
*/
package org.jboss.messaging.integration.transports.netty;
+import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.write;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
@@ -40,8 +58,6 @@
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
-import static org.jboss.netty.channel.Channels.pipeline;
-import static org.jboss.netty.channel.Channels.write;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.UpstreamMessageEvent;
@@ -65,20 +81,6 @@
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.ssl.SslHandler;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
/**
* A NettyConnector
*
@@ -100,7 +102,7 @@
private ClientSocketChannelFactory channelFactory;
private ClientBootstrap bootstrap;
-
+
ChannelGroup channelGroup;
private final BufferHandler handler;
@@ -137,7 +139,7 @@
private ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
- private final String servletPath;
+ private final String servletPath;
// Static --------------------------------------------------------
@@ -227,8 +229,6 @@
this.tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME,
TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE,
configuration);
-
-
}
public synchronized void start()
@@ -247,8 +247,8 @@
{
channelFactory = new OioClientSocketChannelFactory(workerExecutor);
}
- //if we are a servlet wrap the socketChannelFactory
- if(useServlet)
+ // if we are a servlet wrap the socketChannelFactory
+ if (useServlet)
{
ClientSocketChannelFactory proxyChannelFactory = channelFactory;
channelFactory = new HttpTunnelingClientSocketChannelFactory(proxyChannelFactory, workerExecutor);
@@ -305,7 +305,7 @@
pipeline.addLast("httphandler", new HttpHandler());
}
ChannelPipelineSupport.addCodecFilter(pipeline, handler);
- pipeline.addLast("handler", new MessagingClientChannelHandler(channelGroup, handler, listener));
+ pipeline.addLast("handler", new MessagingClientChannelHandler(channelGroup, handler, new Listener()));
return pipeline;
}
});
@@ -317,12 +317,12 @@
{
return;
}
-
+
bootstrap = null;
channelGroup.close().awaitUninterruptibly();
channelFactory.releaseExternalResources();
channelFactory = null;
-
+
for (Connection connection : connections.values())
{
listener.connectionDestroyed(connection.getID());
@@ -438,8 +438,6 @@
private String url;
- //private String sessionId = null;
-
private Future handShakeFuture = new Future();
private boolean active = false;
@@ -452,7 +450,6 @@
private CookieEncoder cookieEncoder = new CookieEncoder();
-
public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
{
super.channelConnected(ctx, e);
@@ -481,7 +478,7 @@
@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
{
- HttpResponse response = (HttpResponse) e.getMessage();
+ HttpResponse response = (HttpResponse)e.getMessage();
if (httpRequiresSessionId && !active)
{
Map<String, Cookie> cookieMap = cookieDecoder.decode(response.getHeader(HttpHeaders.Names.SET_COOKIE));
@@ -494,14 +491,11 @@
active = true;
handShakeFuture.run();
}
- MessageEvent event = new UpstreamMessageEvent(e.getChannel(),
- response.getContent(),
- e.getRemoteAddress());
+ MessageEvent event = new UpstreamMessageEvent(e.getChannel(), response.getContent(), e.getRemoteAddress());
waitingGet = false;
ctx.sendUpstream(event);
}
-
@Override
public void writeRequested(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
{
@@ -527,7 +521,7 @@
{
httpRequest.addHeader(HttpHeaders.Names.COOKIE, cookie);
}
- ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ ChannelBuffer buf = (ChannelBuffer)e.getMessage();
httpRequest.setContent(buf);
httpRequest.addHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
write(ctx, e.getFuture(), httpRequest, e.getRemoteAddress());
@@ -573,7 +567,17 @@
public void connectionDestroyed(final Object connectionID)
{
- connections.remove(connectionID);
+ if (connections.remove(connectionID) != null)
+ {
+ // Execute on different thread to avoid deadlocks
+ new Thread()
+ {
+ public void run()
+ {
+ listener.connectionDestroyed(connectionID);
+ }
+ }.start();
+ }
}
public void connectionException(final Object connectionID, final MessagingException me)
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverExpiredMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverExpiredMessageTest.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverExpiredMessageTest.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -92,12 +92,8 @@
ClientSession session1 = sf1.createSession(false, true, true);
- log.info("created session");
-
session1.createQueue(ADDRESS, ADDRESS, null, false, false);
-
- log.info("created queue");
-
+
session1.start();
ClientProducer producer = session1.createProducer(ADDRESS);
@@ -107,9 +103,7 @@
//Set time to live so at least some of them will more than likely expire before they are consumed by the client
long now = System.currentTimeMillis();
-
- log.info("sending messages");
-
+
long expire = now + 5000;
for (int i = 0; i < numMessages; i++)
@@ -122,12 +116,8 @@
message.putIntProperty(new SimpleString("count"), i);
message.getBody().writeString("aardvarks");
producer.send(message);
-
- log.info("sent message " + i);
}
- log.info("sent messages");
-
ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
final RemotingConnection conn1 = ((ClientSessionImpl)session1).getConnection();
@@ -169,13 +159,10 @@
}
else
{
- log.info("message was null");
break;
}
}
- log.info("Got " + count + " messages");
-
t.join();
session1.close();
@@ -194,16 +181,87 @@
session2.close();
}
+ public void testExpireViaReaperOnLive() throws Exception
+ {
+ ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ sf1.setSendWindowSize(32 * 1024);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ log.info("created session");
+
+ session1.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+ log.info("created queue");
+
+ ClientProducer producer = session1.createProducer(ADDRESS);
+
+ final int numMessages = 10000;
+
+ //Set time to live so messages are expired on the server
+
+ long now = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session1.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ now,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().writeString("aardvarks");
+ producer.send(message);
+ }
+
+ Thread.sleep(4 * expireScanPeriod);
+
+ //Messages should all be expired now
+
+ ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
+
+ session1.start();
+
+ RemotingConnection conn1 = ((ClientSessionImpl)session1).getConnection();
+
+ conn1.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ ClientMessage message = consumer1.receive(1000);
+
+ assertNull(message);
+
+ session1.close();
+
+ //Make sure no more messages
+ ClientSession session2 = sf1.createSession(false, true, true);
+
+ session2.start();
+
+ ClientConsumer consumer2 = session2.createConsumer(ADDRESS);
+
+ message = consumer2.receive(1000);
+
+ assertNull(message);
+
+ session2.close();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+ private final long expireScanPeriod = 1000;
+
@Override
protected void setUp() throws Exception
{
super.setUp();
Configuration backupConf = new ConfigurationImpl();
+ backupConf.setMessageExpiryScanPeriod(expireScanPeriod);
backupConf.setSecurityEnabled(false);
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupConf.getAcceptorConfigurations()
@@ -214,6 +272,7 @@
backupService.start();
Configuration liveConf = new ConfigurationImpl();
+ liveConf.setMessageExpiryScanPeriod(expireScanPeriod);
liveConf.setSecurityEnabled(false);
liveConf.getAcceptorConfigurations()
.add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -913,7 +913,7 @@
Map<String, Object> params = new HashMap<String, Object>();
params.put(UUIDGenerator.getInstance().generateStringUUID(), 123);
params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID());
- params.put(UUIDGenerator.getInstance().generateStringUUID(), 721633.123d);
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), true);
TransportConfiguration tc = new TransportConfiguration(className, params, name);
return tc;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java 2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java 2009-03-07 15:30:27 UTC (rev 6036)
@@ -40,6 +40,7 @@
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
import org.jboss.messaging.core.server.Messaging;
@@ -63,7 +64,10 @@
public class JMSServerControlTest extends UnitTestCase
{
// Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(JMSServerControlTest.class);
+
// Attributes ----------------------------------------------------
private MBeanServer mbeanServer;
@@ -314,6 +318,8 @@
// FIXME: with Netty, the server is not notified immediately that the connection is closed
Thread.sleep(500);
+
+ log.info("got here");
assertEquals(0, control.listRemoteAddresses().length);
}
More information about the jboss-cvs-commits
mailing list