Author: pferraro
Date: 2009-03-19 14:08:43 -0400 (Thu, 19 Mar 2009)
New Revision: 2377
Modified:
trunk/mod_cluster/src/main/java/org/jboss/modcluster/advertise/impl/AdvertiseListenerImpl.java
trunk/mod_cluster/src/test/java/org/jboss/modcluster/advertise/AdvertiseListenerImplTestCase.java
Log:
Various optimizations/fixes.
Enhance unit test to validate pause/resume.
Modified:
trunk/mod_cluster/src/main/java/org/jboss/modcluster/advertise/impl/AdvertiseListenerImpl.java
===================================================================
---
trunk/mod_cluster/src/main/java/org/jboss/modcluster/advertise/impl/AdvertiseListenerImpl.java 2009-03-19
10:08:11 UTC (rev 2376)
+++
trunk/mod_cluster/src/main/java/org/jboss/modcluster/advertise/impl/AdvertiseListenerImpl.java 2009-03-19
18:08:43 UTC (rev 2377)
@@ -39,7 +39,6 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.catalina.util.StringManager;
import org.jboss.logging.Logger;
@@ -64,28 +63,25 @@
public static final String DEFAULT_ENCODING = "8859_1";
public static final String RFC_822_FMT = "EEE, d MMM yyyy HH:mm:ss Z";
- private static final Logger log = Logger.getLogger(AdvertiseListenerImpl.class);
+ static final Logger log = Logger.getLogger(AdvertiseListenerImpl.class);
- private int advertisePort = DEFAULT_PORT;
- private InetAddress groupAddress = null;
+ volatile boolean listening = false;
+
+ int advertisePort = DEFAULT_PORT;
+ InetAddress groupAddress = null;
private MulticastSocketFactory socketFactory;
MulticastSocket socket;
- private boolean initialized = false;
- volatile boolean listening = true;
- final AtomicBoolean running = new AtomicBoolean(false);
- final AtomicBoolean paused = new AtomicBoolean(false);
private boolean daemon = true;
- private final byte[] secure = new byte[16];
private String securityKey = null;
MessageDigest md = null;
final Map<String, AdvertisedServer> servers = new HashMap<String,
AdvertisedServer>();
final MCMPHandler commHandler;
- private Thread workerThread;
+ private AdvertiseListenerWorker workerThread;
/** The string manager for this package. */
private StringManager sm = StringManager.getManager(Constants.Package);
@@ -230,70 +226,45 @@
private synchronized void init() throws IOException
{
- if (!this.initialized)
+ if (this.socket == null)
{
- this.socket = this.socketFactory.createMulticastSocket(this.groupAddress,
this.advertisePort);
+ MulticastSocket socket =
this.socketFactory.createMulticastSocket(this.groupAddress, this.advertisePort);
- this.socket.setTimeToLive(16);
- this.socket.joinGroup(this.groupAddress);
-
- this.initialized = true;
- }
- }
+ // Limit socket send to localhost
+ socket.setTimeToLive(0);
+ socket.joinGroup(this.groupAddress);
- private void interruptDatagramReader()
- {
- if (!this.initialized) return;
-
- try
- {
- // Restrict to localhost.
- this.socket.setTimeToLive(0);
- DatagramPacket dp = new DatagramPacket(this.secure, this.secure.length,
this.groupAddress, this.advertisePort);
- this.socket.send(dp);
+ this.socket = socket;
}
- catch (IOException e)
- {
- // Ignore
- }
}
/**
* @{inheritDoc}
* @see org.jboss.modcluster.advertise.AdvertiseListener#start()
*/
- public void start() throws IOException
+ public synchronized void start() throws IOException
{
this.init();
- if (this.running.compareAndSet(false, true))
+ if (this.workerThread == null)
{
- SecureRandom random = new SecureRandom();
+ this.workerThread = new AdvertiseListenerWorker();
+ this.workerThread.setDaemon(this.daemon);
+ this.workerThread.start();
- synchronized (this.secure)
- {
- random.nextBytes(this.secure);
- this.secure[0] = 0;
- }
-
- this.paused.set(false);
this.listening = true;
-
- AdvertiseListenerWorker aw = new AdvertiseListenerWorker();
- this.workerThread = new Thread(aw);
- this.workerThread.setDaemon(this.daemon);
- this.workerThread.start();
}
}
-
+
/**
* @{inheritDoc}
* @see org.jboss.modcluster.advertise.AdvertiseListener#pause()
*/
- public void pause()
+ public synchronized void pause()
{
- if (this.running.get() && this.paused.compareAndSet(false, true))
+ if (this.workerThread != null)
{
+ this.workerThread.suspendWorker();
this.interruptDatagramReader();
}
}
@@ -302,31 +273,46 @@
* @{inheritDoc}
* @see org.jboss.modcluster.advertise.AdvertiseListener#resume()
*/
- public void resume()
+ public synchronized void resume()
{
- if (this.running.get() && this.paused.compareAndSet(true, false))
+ if (this.workerThread != null)
{
- // Genererate new private secure
- SecureRandom random = new SecureRandom();
-
- synchronized (this.secure)
- {
- random.nextBytes(this.secure);
- this.secure[0] = 0;
- }
+ this.workerThread.resumeWorker();
}
}
+ public void interruptDatagramReader()
+ {
+ if (this.socket == null) return;
+
+ DatagramPacket packet = this.workerThread.createInterruptPacket(this.groupAddress,
this.advertisePort);
+
+ try
+ {
+ this.socket.send(packet);
+ }
+ catch (IOException e)
+ {
+ log.warn("Failed to interrupt socket reception", e);
+ }
+ }
+
/**
* @{inheritDoc}
* @see org.jboss.modcluster.advertise.AdvertiseListener#stop()
*/
- public void stop()
+ public synchronized void stop()
{
- if (this.running.compareAndSet(true, false))
+ // In case worker is paused
+ this.resume();
+
+ if (this.workerThread != null)
{
+ this.workerThread.interrupt();
this.interruptDatagramReader();
this.workerThread = null;
+
+ this.listening = false;
}
}
@@ -334,11 +320,12 @@
* @{inheritDoc}
* @see org.jboss.modcluster.advertise.AdvertiseListener#destroy()
*/
- public void destroy()
+ public synchronized void destroy()
{
+ // In case worker has not been stopped
this.stop();
- if (this.initialized)
+ if (this.socket != null)
{
try
{
@@ -350,7 +337,6 @@
}
this.socket.close();
- this.initialized = false;
this.socket = null;
}
}
@@ -384,74 +370,64 @@
{
return this.listening;
}
-
- boolean matchesSecure(DatagramPacket dp)
+
+ // ------------------------------------ AdvertiseListenerWorker Inner Class
+ class AdvertiseListenerWorker extends Thread
{
- byte[] data = dp.getData();
+ private boolean paused;
+ private byte[] secure = this.generateSecure();
- synchronized (this.secure)
+ public synchronized void suspendWorker()
{
- if (dp.getLength() != this.secure.length) return false;
-
- for (int i = 0; i < this.secure.length; i++)
- {
- if (data[i] != this.secure[i])
- {
- return false;
- }
- }
+ this.paused = true;
}
- return true;
- }
-
- // ------------------------------------ AdvertiseListenerWorker Inner Class
- class AdvertiseListenerWorker implements Runnable
- {
- private DateFormat df = new SimpleDateFormat(RFC_822_FMT, Locale.US);
+ public synchronized void resumeWorker()
+ {
+ this.secure = this.generateSecure();
+ this.paused = false;
+ this.notify();
+ }
+ public DatagramPacket createInterruptPacket(InetAddress address, int port)
+ {
+ return new DatagramPacket(this.secure, this.secure.length, address, port);
+ }
+
/**
* The background thread that listens for incoming Advertise packets
* and hands them off to an appropriate AdvertiseEvent handler.
*/
+ @Override
public void run()
{
+ DateFormat dateFormat = new SimpleDateFormat(RFC_822_FMT, Locale.US);
byte[] buffer = new byte[512];
- // Loop until we receive a shutdown command
- while (AdvertiseListenerImpl.this.running.get())
+
+ // Loop until interrupted
+ while (!this.isInterrupted())
{
- // Loop if endpoint is paused
- while (AdvertiseListenerImpl.this.paused.get())
- {
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- }
try
{
- DatagramPacket dp = new DatagramPacket(buffer, buffer.length);
- AdvertiseListenerImpl.this.socket.receive(dp);
- if (!AdvertiseListenerImpl.this.running.get())
+ synchronized (this)
{
- break;
+ if (this.paused)
+ {
+ this.wait();
+ }
}
- if (AdvertiseListenerImpl.this.matchesSecure(dp)) continue;
+ DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
- byte[] data = dp.getData();
+ AdvertiseListenerImpl.this.socket.receive(packet);
- String s = new String(data, 0, dp.getLength(), DEFAULT_ENCODING);
- if (!s.startsWith("HTTP/1."))
- {
- continue;
- }
+ if (this.matchesSecure(packet)) continue;
+
+ String message = new String(packet.getData(), 0, packet.getLength(),
DEFAULT_ENCODING);
+
+ if (!message.startsWith("HTTP/1.")) continue;
- String[] headers = s.split("\r\n");
+ String[] headers = message.split("\r\n");
String date_str = null;
Date date = null;
int status = 0;
@@ -488,7 +464,7 @@
date_str = hdrv[1];
try
{
- date = this.df.parse(date_str);
+ date = dateFormat.parse(date_str);
}
catch (ParseException e)
{
@@ -544,6 +520,10 @@
}
AdvertiseListenerImpl.this.listening = true;
}
+ catch (InterruptedException e)
+ {
+ this.interrupt();
+ }
catch (IOException e)
{
// Do not blow the CPU in case of communication error
@@ -552,6 +532,34 @@
}
}
}
+
+ private byte[] generateSecure()
+ {
+ SecureRandom random = new SecureRandom();
+
+ byte[] secure = new byte[16];
+
+ random.nextBytes(secure);
+ secure[0] = 0; // why exactly?
+
+ return secure;
+ }
+
+ boolean matchesSecure(DatagramPacket packet)
+ {
+ if (packet.getLength() != this.secure.length) return false;
+
+ byte[] data = packet.getData();
+
+ for (int i = 0; i < this.secure.length; i++)
+ {
+ if (data[i] != this.secure[i])
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
}
-
}
Modified:
trunk/mod_cluster/src/test/java/org/jboss/modcluster/advertise/AdvertiseListenerImplTestCase.java
===================================================================
---
trunk/mod_cluster/src/test/java/org/jboss/modcluster/advertise/AdvertiseListenerImplTestCase.java 2009-03-19
10:08:11 UTC (rev 2376)
+++
trunk/mod_cluster/src/test/java/org/jboss/modcluster/advertise/AdvertiseListenerImplTestCase.java 2009-03-19
18:08:43 UTC (rev 2377)
@@ -35,6 +35,7 @@
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.jboss.modcluster.advertise.impl.AdvertiseListenerImpl;
+import org.jboss.modcluster.advertise.impl.MulticastSocketFactoryImpl;
import org.jboss.modcluster.config.MCMPHandlerConfiguration;
import org.jboss.modcluster.mcmp.MCMPHandler;
import org.junit.After;
@@ -53,18 +54,22 @@
private static final int ADVERTISE_PORT = 23365;
private static final String RFC_822_FMT = "EEE, d MMM yyyy HH:mm:ss Z";
private static final DateFormat df = new SimpleDateFormat(RFC_822_FMT, Locale.US);
- private static final String SERVER = "foo.bar.com";
- private static final String SERVER_ADDRESS = SERVER + ":8888";
+ private static final String SERVER1 = "foo.bar.com";
+ private static final String SERVER2 = "bar.foo.com";
+ private static final int SERVER_PORT = 8888;
+ private static final String SERVER1_ADDRESS = String.format("%s:%d",
SERVER1, SERVER_PORT);
+ private static final String SERVER2_ADDRESS = String.format("%s:%d",
SERVER2, SERVER_PORT);
private MCMPHandler mcmpHandler = EasyMock.createStrictMock(MCMPHandler.class);
private MCMPHandlerConfiguration mcmpConfig =
EasyMock.createMock(MCMPHandlerConfiguration.class);
private MulticastSocketFactory socketFactory =
EasyMock.createMock(MulticastSocketFactory.class);
- private MulticastSocket mcastSocket;
+ private MulticastSocket socket;
+ private InetAddress groupAddress;
private AdvertiseListener listener;
@Before
- public void setup()
+ public void setup() throws Exception
{
EasyMock.expect(this.mcmpConfig.getAdvertiseGroupAddress()).andReturn(ADVERTISE_GROUP);
EasyMock.expect(this.mcmpConfig.getAdvertisePort()).andReturn(ADVERTISE_PORT);
@@ -76,20 +81,18 @@
EasyMock.verify(this.mcmpConfig);
EasyMock.reset(this.mcmpConfig);
+
+ this.groupAddress = InetAddress.getByName(ADVERTISE_GROUP);
+ this.socket = new
MulticastSocketFactoryImpl().createMulticastSocket(this.groupAddress, ADVERTISE_PORT);
}
@After
public void tearDown()
{
- if (this.mcastSocket != null)
+ if ((this.socket != null) && !this.socket.isClosed())
{
- this.mcastSocket.close();
+ this.socket.close();
}
-
- if (this.listener != null)
- {
- this.listener.stop();
- }
}
@Test
@@ -97,22 +100,19 @@
{
Capture<InetAddress> capturedAddress = new Capture<InetAddress>();
-
EasyMock.expect(this.socketFactory.createMulticastSocket(EasyMock.capture(capturedAddress),
EasyMock.eq(ADVERTISE_PORT))).andReturn(new MulticastSocket(ADVERTISE_PORT));
+
EasyMock.expect(this.socketFactory.createMulticastSocket(EasyMock.capture(capturedAddress),
EasyMock.eq(ADVERTISE_PORT))).andReturn(this.socket);
- EasyMock.replay(this.socketFactory);
+ EasyMock.replay(this.socketFactory, this.mcmpHandler);
this.listener.start();
- EasyMock.verify(this.socketFactory);
+ EasyMock.verify(this.socketFactory, this.mcmpHandler);
Assert.assertEquals(ADVERTISE_GROUP,
capturedAddress.getValue().getHostAddress());
+ Assert.assertFalse(this.socket.isClosed());
- EasyMock.reset(this.socketFactory);
+ EasyMock.reset(this.socketFactory, this.mcmpHandler);
- this.mcastSocket = new MulticastSocket(ADVERTISE_PORT);
- InetAddress mcastGroup = InetAddress.getByName(ADVERTISE_GROUP);
- this.mcastSocket.joinGroup(InetAddress.getByName(ADVERTISE_GROUP));
-
String date = df.format(new Date());
StringBuilder data = new StringBuilder("HTTP/1.1 200 OK\r\n");
@@ -120,31 +120,144 @@
data.append(date);
data.append("\r\n");
data.append("Server: ");
- data.append(SERVER);
+ data.append(SERVER1);
data.append("\r\n");
data.append("X-Manager-Address: ");
- data.append(SERVER_ADDRESS);
+ data.append(SERVER1_ADDRESS);
data.append("\r\n");
byte[] buf = data.toString().getBytes();
- DatagramPacket packet = new DatagramPacket(buf, buf.length, mcastGroup,
ADVERTISE_PORT);
+ DatagramPacket packet = new DatagramPacket(buf, buf.length, this.groupAddress,
ADVERTISE_PORT);
- this.mcmpHandler.addProxy(SERVER_ADDRESS);
+ this.mcmpHandler.addProxy(SERVER1_ADDRESS);
- EasyMock.replay(this.mcmpHandler);
+ EasyMock.replay(this.socketFactory, this.mcmpHandler);
- this.mcastSocket.send(packet);
+ this.socket.send(packet);
try
{
// Give time for advertise worker to process message
- Thread.sleep(100);
+ Thread.sleep(1000);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
- EasyMock.verify(this.mcmpHandler);
+ EasyMock.verify(this.socketFactory, this.mcmpHandler);
+ EasyMock.reset(this.socketFactory, this.mcmpHandler);
+
+ EasyMock.replay(this.socketFactory, this.mcmpHandler);
+
+ this.socket.send(packet);
+
+ try
+ {
+ // Give time for advertise worker to process message
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+
+ EasyMock.verify(this.socketFactory, this.mcmpHandler);
+ EasyMock.reset(this.socketFactory, this.mcmpHandler);
+
+ EasyMock.replay(this.socketFactory, this.mcmpHandler);
+
+ this.listener.pause();
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+
+ EasyMock.verify(this.socketFactory, this.mcmpHandler);
+ EasyMock.reset(this.socketFactory, this.mcmpHandler);
+
+ data = new StringBuilder("HTTP/1.1 200 OK\r\n");
+ data.append("Date: ");
+ data.append(date);
+ data.append("\r\n");
+ data.append("Server: ");
+ data.append(SERVER2);
+ data.append("\r\n");
+ data.append("X-Manager-Address: ");
+ data.append(SERVER2_ADDRESS);
+ data.append("\r\n");
+
+ buf = data.toString().getBytes();
+ packet = new DatagramPacket(buf, buf.length, this.groupAddress, ADVERTISE_PORT);
+
+ EasyMock.replay(this.socketFactory, this.mcmpHandler);
+
+ this.socket.send(packet);
+
+ try
+ {
+ // Give time for advertise worker to process message
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+
+ EasyMock.verify(this.socketFactory, this.mcmpHandler);
+ EasyMock.reset(this.socketFactory, this.mcmpHandler);
+
+ this.mcmpHandler.addProxy(SERVER2_ADDRESS);
+
+ EasyMock.replay(this.socketFactory, this.mcmpHandler);
+
+ this.listener.resume();
+
+ this.socket.send(packet);
+
+ try
+ {
+ // Give time for advertise worker to process message
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+
+ EasyMock.verify(this.socketFactory, this.mcmpHandler);
+ EasyMock.reset(this.socketFactory, this.mcmpHandler);
+
+ EasyMock.replay(this.socketFactory, this.mcmpHandler);
+
+ this.listener.stop();
+
+ EasyMock.verify(this.socketFactory, this.mcmpHandler);
+
+ Assert.assertFalse(this.socket.isConnected());
+
+ EasyMock.reset(this.socketFactory, this.mcmpHandler);
+
+ EasyMock.replay(this.socketFactory, this.mcmpHandler);
+
+ this.socket.send(packet);
+
+ EasyMock.verify(this.socketFactory, this.mcmpHandler);
+ EasyMock.reset(this.socketFactory, this.mcmpHandler);
+
+ EasyMock.replay(this.socketFactory, this.mcmpHandler);
+
+ this.listener.destroy();
+
+ EasyMock.verify(this.socketFactory, this.mcmpHandler);
+
+ Assert.assertTrue(this.socket.isClosed());
+
+ EasyMock.reset(this.socketFactory, this.mcmpHandler);
}
}
_______________________________________________
jbossnative-commits mailing list
jbossnative-commits(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/jbossnative-commits