Author: pferraro
Date: 2009-04-27 12:13:05 -0400 (Mon, 27 Apr 2009)
New Revision: 2410
Modified:
trunk/mod_cluster/src/main/java/org/jboss/modcluster/advertise/impl/AdvertiseListenerImpl.java
Log:
Refined thread safety.
Eliminate entirely the possibility of null reference to socket in worker thread.
Eliminate join() in stop().
Added pause check to resumeWorker().
Modified:
trunk/mod_cluster/src/main/java/org/jboss/modcluster/advertise/impl/AdvertiseListenerImpl.java
===================================================================
---
trunk/mod_cluster/src/main/java/org/jboss/modcluster/advertise/impl/AdvertiseListenerImpl.java 2009-04-27
16:07:57 UTC (rev 2409)
+++
trunk/mod_cluster/src/main/java/org/jboss/modcluster/advertise/impl/AdvertiseListenerImpl.java 2009-04-27
16:13:05 UTC (rev 2410)
@@ -40,6 +40,8 @@
import java.util.Locale;
import java.util.Map;
+import net.jcip.annotations.GuardedBy;
+
import org.apache.catalina.util.StringManager;
import org.jboss.logging.Logger;
import org.jboss.modcluster.Constants;
@@ -71,7 +73,7 @@
InetAddress groupAddress = null;
private MulticastSocketFactory socketFactory;
- volatile MulticastSocket socket;
+ private MulticastSocket socket;
private boolean daemon = true;
@@ -248,7 +250,7 @@
if (this.workerThread == null)
{
- this.workerThread = new AdvertiseListenerWorker();
+ this.workerThread = new AdvertiseListenerWorker(this.socket);
this.workerThread.setDaemon(this.daemon);
this.workerThread.start();
@@ -281,7 +283,7 @@
}
}
- public void interruptDatagramReader()
+ public synchronized void interruptDatagramReader()
{
if (this.socket == null) return;
@@ -313,16 +315,6 @@
// In case worker is stuck on socket.receive(...)
this.interruptDatagramReader();
- try
- {
- // Wait for worker to complete
- this.workerThread.join(1000);
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
-
this.workerThread = null;
this.listening = false;
@@ -387,9 +379,17 @@
// ------------------------------------ AdvertiseListenerWorker Inner Class
class AdvertiseListenerWorker extends Thread
{
- private boolean paused;
+ private final MulticastSocket socket;
+ @GuardedBy("this")
+ private boolean paused = false;
+ @GuardedBy("this")
private byte[] secure = this.generateSecure();
+ AdvertiseListenerWorker(MulticastSocket socket)
+ {
+ this.socket = socket;
+ }
+
public synchronized void suspendWorker()
{
this.paused = true;
@@ -397,12 +397,16 @@
public synchronized void resumeWorker()
{
- this.secure = this.generateSecure();
- this.paused = false;
- this.notify();
+ if (this.paused)
+ {
+ this.paused = false;
+ this.secure = this.generateSecure();
+ // Notify run() thread waiting on pause
+ this.notify();
+ }
}
- public DatagramPacket createInterruptPacket(InetAddress address, int port)
+ public synchronized DatagramPacket createInterruptPacket(InetAddress address, int
port)
{
return new DatagramPacket(this.secure, this.secure.length, address, port);
}
@@ -426,16 +430,16 @@
{
if (this.paused)
{
+ // Wait for notify in resumeWorker()
this.wait();
}
}
- if (AdvertiseListenerImpl.this.socket == null)
- break;
-
+
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
- AdvertiseListenerImpl.this.socket.receive(packet);
+ this.socket.receive(packet);
+ // Skip processing if interrupt packet received
if (this.matchesSecure(packet)) continue;
String message = new String(packet.getData(), 0, packet.getLength(),
DEFAULT_ENCODING);
@@ -533,6 +537,7 @@
//eventHandler.onEvent(AdvertiseEventType.ON_STATUS_CHANGE,
server);
}
}
+
AdvertiseListenerImpl.this.listening = true;
}
catch (InterruptedException e)
@@ -541,8 +546,9 @@
}
catch (IOException e)
{
+ AdvertiseListenerImpl.this.listening = false;
+
// Do not blow the CPU in case of communication error
- AdvertiseListenerImpl.this.listening = false;
Thread.yield();
}
}
@@ -560,7 +566,7 @@
return secure;
}
- boolean matchesSecure(DatagramPacket packet)
+ synchronized boolean matchesSecure(DatagramPacket packet)
{
if (packet.getLength() != this.secure.length) return false;
Show replies by date