[infinispan-commits] Infinispan SVN: r1742 - in trunk: server/core/src/main/resources and 20 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed May 5 13:32:58 EDT 2010
Author: manik.surtani at jboss.com
Date: 2010-05-05 13:32:57 -0400 (Wed, 05 May 2010)
New Revision: 1742
Added:
trunk/server/websocket/README-WebSocket-Server.txt
trunk/server/websocket/src/main/distribution/
trunk/server/websocket/src/main/distribution/sample-websocket-client.html
trunk/server/websocket/src/main/java/org/infinispan/server/
trunk/server/websocket/src/main/java/org/infinispan/server/websocket/
trunk/server/websocket/src/main/java/org/infinispan/server/websocket/CacheListener.java
trunk/server/websocket/src/main/java/org/infinispan/server/websocket/ChannelUtils.java
trunk/server/websocket/src/main/java/org/infinispan/server/websocket/OpHandler.java
trunk/server/websocket/src/main/java/org/infinispan/server/websocket/WebSocketServer.java
trunk/server/websocket/src/main/java/org/infinispan/server/websocket/WebSocketServerHandler.java
trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers/
trunk/server/websocket/src/main/resources/org/infinispan/server/
trunk/server/websocket/src/main/resources/org/infinispan/server/websocket/
trunk/server/websocket/src/main/resources/org/infinispan/server/websocket/infinispan-ws.js
trunk/server/websocket/src/test/java/org/infinispan/server/
trunk/server/websocket/src/test/java/org/infinispan/server/websocket/
trunk/server/websocket/src/test/java/org/infinispan/server/websocket/handlers/
Removed:
trunk/server/websocket/README.html
trunk/server/websocket/gui-demo/
trunk/server/websocket/infinispan.xml
trunk/server/websocket/src/main/java/org/infinispan/websocket/CacheListener.java
trunk/server/websocket/src/main/java/org/infinispan/websocket/ChannelUtils.java
trunk/server/websocket/src/main/java/org/infinispan/websocket/OpHandler.java
trunk/server/websocket/src/main/java/org/infinispan/websocket/WebSocketServer.java
trunk/server/websocket/src/main/java/org/infinispan/websocket/WebSocketServerHandler.java
trunk/server/websocket/src/main/java/org/infinispan/websocket/handlers/
trunk/server/websocket/src/main/java/org/json/
trunk/server/websocket/src/main/resources/org/infinispan/websocket/
trunk/server/websocket/src/test/java/org/infinispan/websocket/handlers/
Modified:
trunk/pom.xml
trunk/server/core/src/main/resources/startServer.bat
trunk/server/core/src/main/resources/startServer.sh
trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala
trunk/server/rest/pom.xml
trunk/server/websocket/pom.xml
trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers/GetHandler.java
trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers/NotifyHandler.java
trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers/PutHandler.java
trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers/RemoveHandler.java
trunk/server/websocket/src/test/java/org/infinispan/server/websocket/handlers/MockClient.java
trunk/server/websocket/src/test/java/org/infinispan/server/websocket/handlers/OpHandlerTest.java
trunk/src/main/resources/assemblies/all.xml
trunk/src/main/resources/assemblies/bin.xml
Log:
[ISPN-405] (WebSocket Server) reorganised
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/pom.xml 2010-05-05 17:32:57 UTC (rev 1742)
@@ -33,6 +33,7 @@
<module>server/core</module>
<module>server/memcached</module>
<module>server/hotrod</module>
+ <module>server/websocket</module>
<module>client/hotrod-client</module>
<module>jopr-plugin</module>
<module>demos/gui</module>
Modified: trunk/server/core/src/main/resources/startServer.bat
===================================================================
--- trunk/server/core/src/main/resources/startServer.bat 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/server/core/src/main/resources/startServer.bat 2010-05-05 17:32:57 UTC (rev 1742)
@@ -6,11 +6,13 @@
for %%f in (..\lib\*.jar) do set LIB=!LIB!;%%f
for %%f in (..\modules\memcached\lib\*.jar) do set LIB=!LIB!;%%f
for %%f in (..\modules\hotrod\lib\*.jar) do set LIB=!LIB!;%%f
+for %%f in (..\modules\websocket\lib\*.jar) do set LIB=!LIB!;%%f
rem echo libs: %LIB%
set CP=%LIB%;..\infinispan-core.jar;..\modules\core\infinispan-server-memcached.jar;%CP%
set CP=%LIB%;..\modules\memcached\infinispan-server-memcached.jar;%CP%
set CP=%LIB%;..\modules\hotrod\infinispan-server-hotrod.jar;%CP%
+set CP=%LIB%;..\modules\hotrod\infinispan-server-websocket.jar;%CP%
java -classpath "%CP%" -Dbind.address=127.0.0.1 -Djava.net.preferIPv4Stack=true -Dlog4j.configuration=..\etc\log4j.xml org.infinispan.server.core.Main %*
Modified: trunk/server/core/src/main/resources/startServer.sh
===================================================================
--- trunk/server/core/src/main/resources/startServer.sh 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/server/core/src/main/resources/startServer.sh 2010-05-05 17:32:57 UTC (rev 1742)
@@ -39,6 +39,17 @@
done
fi
+CP=${CP}:${ISPN_HOME}/modules/websocket/infinispan-server-websocket.jar
+
+if [ -e ${ISPN_HOME}/modules/websocket/lib ]
+then
+ for JAR in ${ISPN_HOME}/modules/websocket/lib/*
+ do
+ CP=$CP:$JAR
+ done
+fi
+
+
JVM_PARAMS="${JVM_PARAMS} -Dbind.address=127.0.0.1 -Djava.net.preferIPv4Stack=true -Dlog4j.configuration=file:${ISPN_HOME}/etc/log4j.xml"
# Sample JPDA settings for remote socket debuging
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala 2010-05-05 17:32:57 UTC (rev 1742)
@@ -101,12 +101,14 @@
val clazz = protocol.get match {
case "memcached" => "org.infinispan.server.memcached.MemcachedServer"
case "hotrod" => "org.infinispan.server.hotrod.HotRodServer"
+ case "websocket" => "org.infinispan.server.websocket.WebSocketServer"
}
val port = {
if (props.get(PROP_KEY_PORT) == None) {
protocol.get match {
case "memcached" => 11211
case "hotrod" => 11311
+ case "websocket" => 8181
}
} else {
props.get(PROP_KEY_PORT).get.toInt
@@ -188,7 +190,7 @@
println
println(" -- Stop processing options")
println
- println(" -p, --port=<num> TCP port number to listen on (default: 11211 for Memcached servers, 11311 for Hot Rod servers)")
+ println(" -p, --port=<num> TCP port number to listen on (default: 11211 for Memcached, 11311 for Hot Rod and 8181 for WebSocket server)")
println
println(" -l, --host=<host or ip> Interface to listen on (default: 127.0.0.1, localhost)")
println
@@ -198,7 +200,8 @@
println
println(" -c, --cache_config=<filename> Cache configuration file (default: creates cache with default values)")
println
- println(" -r, --protocol=[memcached|hotrod] Protocol to understand by the server. This is a mandatory option and you should choose one of the two options")
+ println(" -r, --protocol= Protocol to understand by the server. This is a mandatory option and you should choose one of the two options")
+ println(" [memcached|hotrod|websocket]")
println
println(" -i, --idle_timeout=<num> Idle read timeout, in seconds, used to detect stale connections (default: 60 seconds).")
println(" If no new messages have been read within this time, the server disconnects the channel.")
Modified: trunk/server/rest/pom.xml
===================================================================
--- trunk/server/rest/pom.xml 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/server/rest/pom.xml 2010-05-05 17:32:57 UTC (rev 1742)
@@ -63,7 +63,6 @@
<version.jetty>6.1.15</version.jetty>
<version.commons.httpclient>3.1</version.commons.httpclient>
<version.slf4j>1.5.8</version.slf4j>
- <!-- TODO: Find out why on earth when scala version is set to 2.7.7 here, memcached and hotrod modules that depend on 2.8.0.Beta1, receive 2.7.7 library on assembly!!! -->
</properties>
<dependencies>
Copied: trunk/server/websocket/README-WebSocket-Server.txt (from rev 1738, trunk/server/websocket/README.html)
===================================================================
--- trunk/server/websocket/README-WebSocket-Server.txt (rev 0)
+++ trunk/server/websocket/README-WebSocket-Server.txt 2010-05-05 17:32:57 UTC (rev 1742)
@@ -0,0 +1 @@
+See http://community.jboss.org/wiki/InfinispanWebSocketServer
\ No newline at end of file
Deleted: trunk/server/websocket/README.html
===================================================================
--- trunk/server/websocket/README.html 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/server/websocket/README.html 2010-05-05 17:32:57 UTC (rev 1742)
@@ -1 +0,0 @@
-<a href="http://www.screencast.com/t/OGU2NWZl">See Screencast >></a>
\ No newline at end of file
Deleted: trunk/server/websocket/infinispan.xml
===================================================================
--- trunk/server/websocket/infinispan.xml 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/server/websocket/infinispan.xml 2010-05-05 17:32:57 UTC (rev 1742)
@@ -1 +0,0 @@
-<infinispan xmlns="urn:infinispan:config:4.0" />
\ No newline at end of file
Modified: trunk/server/websocket/pom.xml
===================================================================
--- trunk/server/websocket/pom.xml 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/server/websocket/pom.xml 2010-05-05 17:32:57 UTC (rev 1742)
@@ -13,15 +13,20 @@
<name>Infinispan WebSocket Server</name>
<description>WebSocket interface for Infinispan</description>
- <dependencies>
-
- <dependency>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.2.0.BETA1</version>
- <scope>compile</scope>
- </dependency>
-
- </dependencies>
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>infinispan-server-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ <version>20090211</version>
+ </dependency>
+
+ </dependencies>
+
</project>
Copied: trunk/server/websocket/src/main/distribution/sample-websocket-client.html (from rev 1738, trunk/server/websocket/gui-demo/gui-demo.html)
===================================================================
--- trunk/server/websocket/src/main/distribution/sample-websocket-client.html (rev 0)
+++ trunk/server/websocket/src/main/distribution/sample-websocket-client.html 2010-05-05 17:32:57 UTC (rev 1742)
@@ -0,0 +1,44 @@
+<html>
+ <head>
+ <title>Infinispan Cache Query</title>
+
+ <script type="text/javascript" src="http://localhost:8181/infinispan-ws.js" />
+ <script type="text/javascript">
+ <!--
+ var cache = new Cache();
+ var curKey;
+
+ cache.registerCallback(cacheCallback);
+
+ function cacheCallback(key, value) {
+ if(key == curKey) {
+ var node = document.getElementById("val");
+ node.value = value;
+ }
+ }
+
+ function keyChange(newKey) {
+ // turn off notification for old key...
+ cache.unnotify(curKey);
+ // turn on notification of new key...
+ cache.notify(newKey);
+ // record the new curkey so we can turn off notification on next keychange...
+ curKey = newKey;
+ }
+ -->
+ </script>
+ </head>
+ <body>
+ <form onsubmit="return false;">
+ Key:
+ <input type="text" id="key" onchange="keyChange(this.form.key.value)" />
+ Value:
+ <input type="text" id="val" /><br/>
+
+ <input type="button" value="Put" onclick="cache.put(this.form.key.value, this.form.val.value)" />
+ <input type="button" value="Get" onclick="cache.get(this.form.key.value)" />
+ <input type="button" value="Remove" onclick="cache.remove(this.form.key.value)" />
+ </form>
+
+ </body>
+</html>
Copied: trunk/server/websocket/src/main/java/org/infinispan/server/websocket/CacheListener.java (from rev 1738, trunk/server/websocket/src/main/java/org/infinispan/websocket/CacheListener.java)
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/server/websocket/CacheListener.java (rev 0)
+++ trunk/server/websocket/src/main/java/org/infinispan/server/websocket/CacheListener.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -0,0 +1,202 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.websocket;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.infinispan.Cache;
+import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
+import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
+import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
+import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
+import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
+import org.infinispan.notifications.cachelistener.event.Event;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.handler.codec.http.websocket.DefaultWebSocketFrame;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Cache listener.
+ * <p/>
+ * Used to notify websocket clients of cache entry updates.
+ *
+ * @author <a href="mailto:tom.fennelly at gmail.com">tom.fennelly at gmail.com</a>
+ */
+ at Listener
+public class CacheListener {
+
+ private List<ChannelNotifyParams> channels = new CopyOnWriteArrayList<ChannelNotifyParams>();
+
+ @CacheEntryCreated
+ public void cacheEntryCreated(CacheEntryCreatedEvent event) {
+ notifyChannels(event, event.getType());
+ }
+
+ @CacheEntryModified
+ public void cacheEntryModified(CacheEntryModifiedEvent event) {
+ notifyChannels(event, event.getType());
+ }
+
+ @CacheEntryRemoved
+ public void cacheEntryRemoved(CacheEntryRemovedEvent event) {
+ notifyChannels(event, event.getType());
+ }
+
+ private void notifyChannels(CacheEntryEvent event, Event.Type eventType) {
+ if(event.isPre()) {
+ return;
+ }
+
+ JSONObject jsonObject;
+
+ try {
+ Cache<Object, Object> cache = event.getCache();
+ Object key = event.getKey();
+ Object value;
+
+ switch(eventType) {
+ case CACHE_ENTRY_CREATED:
+ // TODO: Add optimization ... don't get from cache if non of the channels are interested in creates...
+ value = cache.get(key);
+ jsonObject = ChannelUtils.toJSON(key.toString(), value, cache.getName());
+ break;
+ case CACHE_ENTRY_MODIFIED:
+ value = ((CacheEntryModifiedEvent)event).getValue();
+ jsonObject = ChannelUtils.toJSON(key.toString(), value, cache.getName());
+ break;
+ case CACHE_ENTRY_REMOVED:
+ jsonObject = ChannelUtils.toJSON(key.toString(), null, cache.getName());
+ break;
+ default:
+ return;
+ }
+
+ jsonObject.put("eventType", eventType.toString());
+ } catch (JSONException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return;
+ }
+
+ String jsonString = jsonObject.toString();
+ for(ChannelNotifyParams channel : channels) {
+ if(channel.channel.isOpen() && channel.onEvents.contains(eventType)) {
+ if(channel.key != null) {
+ if(event.getKey().equals(channel.key) || channel.key.equals("*")) {
+ channel.channel.write(new DefaultWebSocketFrame(jsonString));
+ }
+ } else {
+ channel.channel.write(new DefaultWebSocketFrame(jsonString));
+ }
+ }
+ }
+ }
+
+ public void addChannel(ChannelNotifyParams channel) {
+ if(!channels.contains(channel)) {
+ channels.add(channel);
+ channel.channel.getCloseFuture().addListener(new ChannelCloseFutureListener());
+ }
+ }
+
+ public void removeChannel(ChannelNotifyParams channel) {
+ channels.remove(channel);
+ }
+
+ public static class ChannelNotifyParams {
+
+ private static final String[] DEFAULT_EVENTS = {Event.Type.CACHE_ENTRY_MODIFIED.toString(), Event.Type.CACHE_ENTRY_REMOVED.toString()};
+
+ private Channel channel;
+ private String key;
+ private List<Event.Type> onEvents = new ArrayList<Event.Type>();
+
+ public ChannelNotifyParams(Channel channel, String key, String[] onEvents) {
+ if(channel == null) {
+ throw new IllegalArgumentException("null 'channel' arg in constructor call.");
+ }
+ String[] onEventsSpec = onEvents;
+
+ this.channel = channel;
+ this.key = key;
+
+ if(onEventsSpec == null) {
+ onEventsSpec = DEFAULT_EVENTS;
+ }
+ for(String eventType : onEventsSpec) {
+ try {
+ this.onEvents.add(Event.Type.valueOf(eventType));
+ } catch(RuntimeException e) {
+ // Ignore for now
+ }
+ }
+
+ if(onEvents == null && key.equals("*")) {
+ this.onEvents.add(Event.Type.CACHE_ENTRY_CREATED);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if(obj instanceof ChannelNotifyParams) {
+ ChannelNotifyParams channelNotifyParams = (ChannelNotifyParams) obj;
+ if(channelNotifyParams.channel == channel) {
+ if(key == null) {
+ return (channelNotifyParams.key == null);
+ } else {
+ return key.equals(channelNotifyParams.key);
+ }
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ if(key != null) {
+ return super.hashCode() + channel.hashCode() + key.hashCode();
+ } else {
+ return super.hashCode() + channel.hashCode();
+ }
+ }
+ }
+
+ private class ChannelCloseFutureListener implements ChannelFutureListener {
+
+ public void operationComplete(ChannelFuture channelCloseFuture) throws Exception {
+ for(ChannelNotifyParams channel : channels) {
+ if(channelCloseFuture.getChannel() == channel.channel) {
+ removeChannel(channel);
+ }
+ }
+ }
+ }
+}
Copied: trunk/server/websocket/src/main/java/org/infinispan/server/websocket/ChannelUtils.java (from rev 1738, trunk/server/websocket/src/main/java/org/infinispan/websocket/ChannelUtils.java)
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/server/websocket/ChannelUtils.java (rev 0)
+++ trunk/server/websocket/src/main/java/org/infinispan/server/websocket/ChannelUtils.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -0,0 +1,83 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.websocket;
+
+import org.infinispan.Cache;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.websocket.DefaultWebSocketFrame;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Channel Utilities.
+ *
+ * @author <a href="mailto:tom.fennelly at gmail.com">tom.fennelly at gmail.com</a>
+ */
+public class ChannelUtils {
+
+ /**
+ * Push a cache entry value out onto the websocket channel (to the browser).
+ * @param key The cache entry key whose value is to be pushed to the browser.
+ * @param cache The cache containing the key.
+ * @param ctx The channel context associated with the browser websocket channel..
+ * @throws JSONException Error generating JSON string.
+ */
+ public static void pushCacheValue(String key, Cache<Object, Object> cache, ChannelHandlerContext ctx) throws JSONException {
+ Object value = cache.get(key);
+
+ JSONObject responseObject = toJSON(key, value, cache.getName());
+
+ // Write the JSON response out onto the channel...
+ ctx.getChannel().write(new DefaultWebSocketFrame(responseObject.toString()));
+ }
+
+ /**
+ * Cache key, value and cache-name to JSON string.
+ * @param key The cache key.
+ * @param value The cache value.
+ * @param cacheName The cache name.
+ * @return JSON Object representing a cache entry payload for transmission to the browser channel.
+ * @throws JSONException Error generating JSON string.
+ */
+ public static JSONObject toJSON(String key, Object value, String cacheName) throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+
+ jsonObject.put(OpHandler.CACHE_NAME, cacheName);
+ jsonObject.put(OpHandler.KEY, key);
+
+ if(value != null) {
+ // Encode the cache value as JSON...
+ JSONObject valueObject = new JSONObject(value);
+ if(valueObject.get("bytes") == null) {
+ jsonObject.put(OpHandler.VALUE, valueObject.toString());
+ jsonObject.put(OpHandler.MIME, "application/json");
+ } else {
+ jsonObject.put(OpHandler.VALUE, value);
+ jsonObject.put(OpHandler.MIME, "text/plain");
+ }
+ } else {
+ jsonObject.put(OpHandler.VALUE, JSONObject.NULL);
+ }
+
+ return jsonObject;
+ }
+}
Property changes on: trunk/server/websocket/src/main/java/org/infinispan/server/websocket/ChannelUtils.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Copied: trunk/server/websocket/src/main/java/org/infinispan/server/websocket/OpHandler.java (from rev 1738, trunk/server/websocket/src/main/java/org/infinispan/websocket/OpHandler.java)
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/server/websocket/OpHandler.java (rev 0)
+++ trunk/server/websocket/src/main/java/org/infinispan/server/websocket/OpHandler.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.websocket;
+
+import org.infinispan.Cache;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Websocket cache operation handler.
+ *
+ * @author <a href="mailto:tom.fennelly at gmail.com">tom.fennelly at gmail.com</a>
+ */
+public interface OpHandler {
+
+ public static final String OP_CODE = "opCode";
+ public static final String CACHE_NAME = "cacheName";
+ public static final String KEY = "key";
+ public static final String VALUE = "value";
+ public static final String MIME = "mime";
+
+ /**
+ * Handle a websocket channel operation.
+ *
+ * @param opPayload Operation payload.
+ * @param cache The target cache.
+ * @param ctx The Netty websocket channel handler.
+ */
+ void handleOp(JSONObject opPayload, Cache<Object, Object> cache, ChannelHandlerContext ctx) throws JSONException;
+}
Copied: trunk/server/websocket/src/main/java/org/infinispan/server/websocket/WebSocketServer.java (from rev 1738, trunk/server/websocket/src/main/java/org/infinispan/websocket/WebSocketServer.java)
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/server/websocket/WebSocketServer.java (rev 0)
+++ trunk/server/websocket/src/main/java/org/infinispan/server/websocket/WebSocketServer.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -0,0 +1,173 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.websocket;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import org.infinispan.Cache;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.server.core.AbstractProtocolServer;
+import org.infinispan.server.websocket.handlers.GetHandler;
+import org.infinispan.server.websocket.handlers.NotifyHandler;
+import org.infinispan.server.websocket.handlers.PutHandler;
+import org.infinispan.server.websocket.handlers.RemoveHandler;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.infinispan.server.core.transport.Encoder;
+import org.infinispan.server.core.transport.Decoder;
+
+/**
+ * An HTTP server which serves Web Socket requests on an Infinispan cacheManager.
+ * <p/>
+ * Websocket specific code lifted from Netty WebSocket Server example.
+ */
+public class WebSocketServer extends AbstractProtocolServer {
+
+ public static final String INFINISPAN_WS_JS_FILENAME = "infinispan-ws.js";
+
+ private static String javascript;
+ private Channel channel;
+
+ public WebSocketServer() {
+ super("WebSocketServerThread");
+ }
+
+ public Encoder getEncoder() {
+ return null;
+ }
+
+ public Decoder getDecoder() {
+ return null;
+ }
+
+ @Override
+ public void start(String host, int port, CacheManager cacheManager, int masterThreads, int workerThreads,
+ int idleTimeout) {
+ InetSocketAddress address = new InetSocketAddress(host, port);
+
+ Executor masterExecutor =
+ masterThreads == 0 ?
+ Executors.newCachedThreadPool() :
+ Executors.newFixedThreadPool(masterThreads);
+ Executor workerExecutor =
+ workerThreads == 0 ?
+ Executors.newCachedThreadPool():
+ Executors.newFixedThreadPool(masterThreads);
+
+ NioServerSocketChannelFactory factory =
+ workerThreads == 0 ?
+ new NioServerSocketChannelFactory(masterExecutor, workerExecutor) :
+ new NioServerSocketChannelFactory(masterExecutor, workerExecutor, workerThreads);
+
+ // Configure the server.
+ ServerBootstrap bootstrap = new ServerBootstrap(factory);
+
+ // Set up the event pipeline factory.
+ bootstrap.setPipelineFactory(new WebSocketServerPipelineFactory(cacheManager));
+
+ // Bind and start to accept incoming connections.
+ bootstrap.bind(address);
+ }
+
+ @Override
+ public void stop() {
+ if (channel != null) channel.close();
+ }
+
+ private static class WebSocketServerPipelineFactory implements ChannelPipelineFactory {
+
+ private CacheManager cacheManager;
+ private Map<String, OpHandler> operationHandlers;
+ private Map<String, Cache> startedCaches = new ConcurrentHashMap<String, Cache>();
+
+ public WebSocketServerPipelineFactory(CacheManager cacheManager) {
+ this.cacheManager = cacheManager;
+
+ operationHandlers = new HashMap<String, OpHandler>();
+ operationHandlers.put("put", new PutHandler());
+ operationHandlers.put("get", new GetHandler());
+ operationHandlers.put("remove", new RemoveHandler());
+ NotifyHandler notifyHandler = new NotifyHandler();
+ operationHandlers.put("notify", notifyHandler);
+ operationHandlers.put("unnotify", notifyHandler);
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("handler", new WebSocketServerHandler(cacheManager, operationHandlers, startedCaches));
+
+ return pipeline;
+ }
+ }
+
+ public static String getJavascript() {
+ if (javascript != null) {
+ return javascript;
+ }
+
+ BufferedReader scriptReader = new BufferedReader(new InputStreamReader(WebSocketServer.class.getResourceAsStream(INFINISPAN_WS_JS_FILENAME)));
+
+ try {
+ StringWriter writer = new StringWriter();
+
+ String line = scriptReader.readLine();
+ while (line != null) {
+ writer.write(line);
+ writer.write('\n');
+ line = scriptReader.readLine();
+ }
+
+ javascript = writer.toString();
+
+ return javascript;
+ } catch (IOException e) {
+ throw new IllegalStateException("Unexpected exception while sending Websockets script to client.", e);
+ } finally {
+ try {
+ scriptReader.close();
+ } catch (IOException e) {
+ throw new IllegalStateException("Unexpected exception while closing Websockets script to client.", e);
+ }
+ }
+ }
+}
\ No newline at end of file
Copied: trunk/server/websocket/src/main/java/org/infinispan/server/websocket/WebSocketServerHandler.java (from rev 1738, trunk/server/websocket/src/main/java/org/infinispan/websocket/WebSocketServerHandler.java)
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/server/websocket/WebSocketServerHandler.java (rev 0)
+++ trunk/server/websocket/src/main/java/org/infinispan/server/websocket/WebSocketServerHandler.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -0,0 +1,213 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.websocket;
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.*;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.*;
+import static org.jboss.netty.handler.codec.http.HttpMethod.*;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.*;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+import org.infinispan.Cache;
+import org.infinispan.manager.CacheManager;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
+import org.jboss.netty.handler.codec.http.HttpHeaders.Values;
+import org.jboss.netty.handler.codec.http.websocket.WebSocketFrame;
+import org.jboss.netty.handler.codec.http.websocket.WebSocketFrameDecoder;
+import org.jboss.netty.handler.codec.http.websocket.WebSocketFrameEncoder;
+import org.jboss.netty.util.CharsetUtil;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Web Socket Server Handler (Netty).
+ * <p/>
+ * Websocket specific code lifted from Netty WebSocket Server example.
+ */
+public class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
+
+ private static final String INFINISPAN_WS_JS_FILENAME = "infinispan-ws.js";
+ private CacheManager cacheManager;
+ private Map<String, OpHandler> operationHandlers;
+ private boolean connectionUpgraded;
+ private Map<String, Cache> startedCaches;
+
+ public WebSocketServerHandler(CacheManager cacheManager, Map<String, OpHandler> operationHandlers, Map<String, Cache> startedCaches) {
+ this.cacheManager = cacheManager;
+ this.operationHandlers = operationHandlers;
+ this.startedCaches = startedCaches;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ Object msg = e.getMessage();
+ if (msg instanceof HttpRequest) {
+ handleHttpRequest(ctx, (HttpRequest) msg);
+ } else if (msg instanceof WebSocketFrame) {
+ handleWebSocketFrame(ctx, (WebSocketFrame) msg);
+ }
+ }
+
+ private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
+ // Allow only GET methods.
+ if (req.getMethod() != GET) {
+ sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, FORBIDDEN));
+ return;
+ }
+
+ if(!connectionUpgraded && req.getUri().equalsIgnoreCase("/" + INFINISPAN_WS_JS_FILENAME)) {
+ DefaultHttpResponse res = new DefaultHttpResponse(HTTP_1_1, OK);
+ loadScriptToResponse(req, res);
+ sendHttpResponse(ctx, req, res);
+ return;
+ } else if (Values.UPGRADE.equalsIgnoreCase(req.getHeader(CONNECTION)) &&
+ WEBSOCKET.equalsIgnoreCase(req.getHeader(Names.UPGRADE))) {
+
+ // Serve the WebSocket handshake request.
+
+ // Create the WebSocket handshake response.
+ HttpResponse res = new DefaultHttpResponse(HTTP_1_1, new HttpResponseStatus(101, "Web Socket Protocol Handshake"));
+ res.addHeader(Names.UPGRADE, WEBSOCKET);
+ res.addHeader(CONNECTION, Values.UPGRADE);
+ res.addHeader(WEBSOCKET_ORIGIN, req.getHeader(ORIGIN));
+ res.addHeader(WEBSOCKET_LOCATION, getWebSocketLocation(req));
+ String protocol = req.getHeader(WEBSOCKET_PROTOCOL);
+ if (protocol != null) {
+ res.addHeader(WEBSOCKET_PROTOCOL, protocol);
+ }
+
+ // Upgrade the connection and send the handshake response.
+ ChannelPipeline p = ctx.getChannel().getPipeline();
+ p.remove("aggregator");
+ p.replace("decoder", "wsdecoder", new WebSocketFrameDecoder());
+
+ ctx.getChannel().write(res);
+
+ p.replace("encoder", "wsencoder", new WebSocketFrameEncoder());
+ return;
+ }
+
+ // Send an error page otherwise.
+ sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, FORBIDDEN));
+ }
+
+ private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
+ try {
+ JSONObject payload = new JSONObject(frame.getTextData());
+ String opCode = (String) payload.get(OpHandler.OP_CODE);
+ String cacheName = (String) payload.opt(OpHandler.CACHE_NAME);
+ Cache<Object, Object> cache = getCache(cacheName);
+
+ OpHandler handler = operationHandlers.get(opCode);
+ if(handler != null) {
+ handler.handleOp(payload, cache, ctx);
+ }
+ } catch (JSONException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ private Cache<Object, Object> getCache(final String cacheName) {
+ String key = cacheName;
+ Cache<Object, Object> cache;
+
+ if(key == null) {
+ key = "";
+ }
+
+ cache = startedCaches.get(key);
+
+ if(cache == null) {
+ synchronized (startedCaches) {
+ cache = startedCaches.get(key);
+ if(cache == null) {
+ if(cacheName != null) {
+ cache = cacheManager.getCache(key);
+ } else {
+ cache = cacheManager.getCache();
+ }
+ startedCaches.put(key, cache);
+ cache.start();
+ }
+ }
+ }
+
+ return cache;
+ }
+
+ private void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
+ // Generate an error page if response status code is not OK (200).
+ if (res.getStatus().getCode() != 200) {
+ res.setContent(ChannelBuffers.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8));
+ setContentLength(res, res.getContent().readableBytes());
+ }
+
+ // Send the response and close the connection if necessary.
+ ChannelFuture f = ctx.getChannel().write(res);
+ if (!isKeepAlive(req) || res.getStatus().getCode() != 200) {
+ f.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+ private void loadScriptToResponse(HttpRequest req, DefaultHttpResponse res) {
+ String wsAddress = getWebSocketLocation(req);
+
+ StringWriter writer = new StringWriter();
+ writer.write("var defaultWSAddress = '" + wsAddress + "';");
+ writer.write(WebSocketServer.getJavascript());
+
+ ChannelBuffer content = ChannelBuffers.copiedBuffer(writer.toString(), CharsetUtil.UTF_8);
+
+ res.setHeader(CONTENT_TYPE, "text/javascript; charset=UTF-8");
+ setContentLength(res, content.readableBytes());
+ res.setContent(content);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ e.getCause().printStackTrace();
+ e.getChannel().close();
+ }
+
+ private String getWebSocketLocation(HttpRequest req) {
+ return "ws://" + req.getHeader(HttpHeaders.Names.HOST) + "/";
+ }
+}
\ No newline at end of file
Copied: trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers (from rev 1738, trunk/server/websocket/src/main/java/org/infinispan/websocket/handlers)
Modified: trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers/GetHandler.java
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/websocket/handlers/GetHandler.java 2010-05-05 13:02:55 UTC (rev 1738)
+++ trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers/GetHandler.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -19,11 +19,11 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.infinispan.websocket.handlers;
+package org.infinispan.server.websocket.handlers;
import org.infinispan.Cache;
-import org.infinispan.websocket.ChannelUtils;
-import org.infinispan.websocket.OpHandler;
+import org.infinispan.server.websocket.ChannelUtils;
+import org.infinispan.server.websocket.OpHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.json.JSONException;
import org.json.JSONObject;
Modified: trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers/NotifyHandler.java
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/websocket/handlers/NotifyHandler.java 2010-05-05 13:02:55 UTC (rev 1738)
+++ trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers/NotifyHandler.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -19,16 +19,16 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.infinispan.websocket.handlers;
+package org.infinispan.server.websocket.handlers;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.infinispan.Cache;
-import org.infinispan.websocket.CacheListener;
-import org.infinispan.websocket.ChannelUtils;
-import org.infinispan.websocket.OpHandler;
-import org.infinispan.websocket.CacheListener.ChannelNotifyParams;
+import org.infinispan.server.websocket.CacheListener;
+import org.infinispan.server.websocket.ChannelUtils;
+import org.infinispan.server.websocket.OpHandler;
+import org.infinispan.server.websocket.CacheListener.ChannelNotifyParams;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.json.JSONException;
import org.json.JSONObject;
Modified: trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers/PutHandler.java
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/websocket/handlers/PutHandler.java 2010-05-05 13:02:55 UTC (rev 1738)
+++ trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers/PutHandler.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -19,10 +19,10 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.infinispan.websocket.handlers;
+package org.infinispan.server.websocket.handlers;
import org.infinispan.Cache;
-import org.infinispan.websocket.OpHandler;
+import org.infinispan.server.websocket.OpHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.json.JSONException;
import org.json.JSONObject;
Modified: trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers/RemoveHandler.java
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/websocket/handlers/RemoveHandler.java 2010-05-05 13:02:55 UTC (rev 1738)
+++ trunk/server/websocket/src/main/java/org/infinispan/server/websocket/handlers/RemoveHandler.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -19,10 +19,10 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.infinispan.websocket.handlers;
+package org.infinispan.server.websocket.handlers;
import org.infinispan.Cache;
-import org.infinispan.websocket.OpHandler;
+import org.infinispan.server.websocket.OpHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.json.JSONException;
import org.json.JSONObject;
Deleted: trunk/server/websocket/src/main/java/org/infinispan/websocket/CacheListener.java
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/websocket/CacheListener.java 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/server/websocket/src/main/java/org/infinispan/websocket/CacheListener.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -1,202 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2006, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.websocket;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.infinispan.Cache;
-import org.infinispan.notifications.Listener;
-import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
-import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
-import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
-import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
-import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
-import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
-import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
-import org.infinispan.notifications.cachelistener.event.Event;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.handler.codec.http.websocket.DefaultWebSocketFrame;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * Cache listener.
- * <p/>
- * Used to notify websocket clients of cache entry updates.
- *
- * @author <a href="mailto:tom.fennelly at gmail.com">tom.fennelly at gmail.com</a>
- */
- at Listener
-public class CacheListener {
-
- private List<ChannelNotifyParams> channels = new CopyOnWriteArrayList<ChannelNotifyParams>();
-
- @CacheEntryCreated
- public void cacheEntryCreated(CacheEntryCreatedEvent event) {
- notifyChannels(event, event.getType());
- }
-
- @CacheEntryModified
- public void cacheEntryModified(CacheEntryModifiedEvent event) {
- notifyChannels(event, event.getType());
- }
-
- @CacheEntryRemoved
- public void cacheEntryRemoved(CacheEntryRemovedEvent event) {
- notifyChannels(event, event.getType());
- }
-
- private void notifyChannels(CacheEntryEvent event, Event.Type eventType) {
- if(event.isPre()) {
- return;
- }
-
- JSONObject jsonObject;
-
- try {
- Cache<Object, Object> cache = event.getCache();
- Object key = event.getKey();
- Object value;
-
- switch(eventType) {
- case CACHE_ENTRY_CREATED:
- // TODO: Add optimization ... don't get from cache if non of the channels are interested in creates...
- value = cache.get(key);
- jsonObject = ChannelUtils.toJSON(key.toString(), value, cache.getName());
- break;
- case CACHE_ENTRY_MODIFIED:
- value = ((CacheEntryModifiedEvent)event).getValue();
- jsonObject = ChannelUtils.toJSON(key.toString(), value, cache.getName());
- break;
- case CACHE_ENTRY_REMOVED:
- jsonObject = ChannelUtils.toJSON(key.toString(), null, cache.getName());
- break;
- default:
- return;
- }
-
- jsonObject.put("eventType", eventType.toString());
- } catch (JSONException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- return;
- }
-
- String jsonString = jsonObject.toString();
- for(ChannelNotifyParams channel : channels) {
- if(channel.channel.isOpen() && channel.onEvents.contains(eventType)) {
- if(channel.key != null) {
- if(event.getKey().equals(channel.key) || channel.key.equals("*")) {
- channel.channel.write(new DefaultWebSocketFrame(jsonString));
- }
- } else {
- channel.channel.write(new DefaultWebSocketFrame(jsonString));
- }
- }
- }
- }
-
- public void addChannel(ChannelNotifyParams channel) {
- if(!channels.contains(channel)) {
- channels.add(channel);
- channel.channel.getCloseFuture().addListener(new ChannelCloseFutureListener());
- }
- }
-
- public void removeChannel(ChannelNotifyParams channel) {
- channels.remove(channel);
- }
-
- public static class ChannelNotifyParams {
-
- private static final String[] DEFAULT_EVENTS = {Event.Type.CACHE_ENTRY_MODIFIED.toString(), Event.Type.CACHE_ENTRY_REMOVED.toString()};
-
- private Channel channel;
- private String key;
- private List<Event.Type> onEvents = new ArrayList<Event.Type>();
-
- public ChannelNotifyParams(Channel channel, String key, String[] onEvents) {
- if(channel == null) {
- throw new IllegalArgumentException("null 'channel' arg in constructor call.");
- }
- String[] onEventsSpec = onEvents;
-
- this.channel = channel;
- this.key = key;
-
- if(onEventsSpec == null) {
- onEventsSpec = DEFAULT_EVENTS;
- }
- for(String eventType : onEventsSpec) {
- try {
- this.onEvents.add(Event.Type.valueOf(eventType));
- } catch(RuntimeException e) {
- // Ignore for now
- }
- }
-
- if(onEvents == null && key.equals("*")) {
- this.onEvents.add(Event.Type.CACHE_ENTRY_CREATED);
- }
- }
-
- @Override
- public boolean equals(Object obj) {
- if(obj instanceof ChannelNotifyParams) {
- ChannelNotifyParams channelNotifyParams = (ChannelNotifyParams) obj;
- if(channelNotifyParams.channel == channel) {
- if(key == null) {
- return (channelNotifyParams.key == null);
- } else {
- return key.equals(channelNotifyParams.key);
- }
- }
- }
-
- return false;
- }
-
- @Override
- public int hashCode() {
- if(key != null) {
- return super.hashCode() + channel.hashCode() + key.hashCode();
- } else {
- return super.hashCode() + channel.hashCode();
- }
- }
- }
-
- private class ChannelCloseFutureListener implements ChannelFutureListener {
-
- public void operationComplete(ChannelFuture channelCloseFuture) throws Exception {
- for(ChannelNotifyParams channel : channels) {
- if(channelCloseFuture.getChannel() == channel.channel) {
- removeChannel(channel);
- }
- }
- }
- }
-}
Deleted: trunk/server/websocket/src/main/java/org/infinispan/websocket/ChannelUtils.java
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/websocket/ChannelUtils.java 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/server/websocket/src/main/java/org/infinispan/websocket/ChannelUtils.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -1,83 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2006, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.websocket;
-
-import org.infinispan.Cache;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.websocket.DefaultWebSocketFrame;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * Channel Utilities.
- *
- * @author <a href="mailto:tom.fennelly at gmail.com">tom.fennelly at gmail.com</a>
- */
-public class ChannelUtils {
-
- /**
- * Push a cache entry value out onto the websocket channel (to the browser).
- * @param key The cache entry key whose value is to be pushed to the browser.
- * @param cache The cache containing the key.
- * @param ctx The channel context associated with the browser websocket channel..
- * @throws JSONException Error generating JSON string.
- */
- public static void pushCacheValue(String key, Cache<Object, Object> cache, ChannelHandlerContext ctx) throws JSONException {
- Object value = cache.get(key);
-
- JSONObject responseObject = toJSON(key, value, cache.getName());
-
- // Write the JSON response out onto the channel...
- ctx.getChannel().write(new DefaultWebSocketFrame(responseObject.toString()));
- }
-
- /**
- * Cache key, value and cache-name to JSON string.
- * @param key The cache key.
- * @param value The cache value.
- * @param cacheName The cache name.
- * @return JSON Object representing a cache entry payload for transmission to the browser channel.
- * @throws JSONException Error generating JSON string.
- */
- public static JSONObject toJSON(String key, Object value, String cacheName) throws JSONException {
- JSONObject jsonObject = new JSONObject();
-
- jsonObject.put(OpHandler.CACHE_NAME, cacheName);
- jsonObject.put(OpHandler.KEY, key);
-
- if(value != null) {
- // Encode the cache value as JSON...
- JSONObject valueObject = new JSONObject(value);
- if(valueObject.get("bytes") == null) {
- jsonObject.put(OpHandler.VALUE, valueObject.toString());
- jsonObject.put(OpHandler.MIME, "application/json");
- } else {
- jsonObject.put(OpHandler.VALUE, value);
- jsonObject.put(OpHandler.MIME, "text/plain");
- }
- } else {
- jsonObject.put(OpHandler.VALUE, JSONObject.NULL);
- }
-
- return jsonObject;
- }
-}
Deleted: trunk/server/websocket/src/main/java/org/infinispan/websocket/OpHandler.java
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/websocket/OpHandler.java 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/server/websocket/src/main/java/org/infinispan/websocket/OpHandler.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -1,50 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2006, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.websocket;
-
-import org.infinispan.Cache;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * Websocket cache operation handler.
- *
- * @author <a href="mailto:tom.fennelly at gmail.com">tom.fennelly at gmail.com</a>
- */
-public interface OpHandler {
-
- public static final String OP_CODE = "opCode";
- public static final String CACHE_NAME = "cacheName";
- public static final String KEY = "key";
- public static final String VALUE = "value";
- public static final String MIME = "mime";
-
- /**
- * Handle a websocket channel operation.
- *
- * @param opPayload Operation payload.
- * @param cache The target cache.
- * @param ctx The Netty websocket channel handler.
- */
- void handleOp(JSONObject opPayload, Cache<Object, Object> cache, ChannelHandlerContext ctx) throws JSONException;
-}
Deleted: trunk/server/websocket/src/main/java/org/infinispan/websocket/WebSocketServer.java
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/websocket/WebSocketServer.java 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/server/websocket/src/main/java/org/infinispan/websocket/WebSocketServer.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -1,188 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2006, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.websocket;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.StringWriter;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-
-import org.infinispan.Cache;
-import org.infinispan.manager.CacheManager;
-import org.infinispan.manager.DefaultCacheManager;
-import org.infinispan.websocket.handlers.GetHandler;
-import org.infinispan.websocket.handlers.NotifyHandler;
-import org.infinispan.websocket.handlers.PutHandler;
-import org.infinispan.websocket.handlers.RemoveHandler;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-
-/**
- * An HTTP server which serves Web Socket requests on an Infinispan cacheManager.
- * <p/>
- * Websocket specific code lifted from Netty WebSocket Server example.
- */
-public class WebSocketServer {
-
- public static final String ORG_INFINISPAN_WS_HOST = "org.infinispan.ws.host";
- public static final String ORG_INFINISPAN_WS_PORT = "org.infinispan.ws.port";
- public static final String ORG_INFINISPAN_WS_CACHE_CONFIG_FILE = "org.infinispan.ws.cache.config-file";
- public static final String ORG_INFINISPAN_WS_CACHE_CONFIG_FILE_DEFAULT = "./infinispan.xml";
- public static final String INFINISPAN_WS_JS_FILENAME = "infinispan-ws.js";
-
- private static String javascript;
-
- public WebSocketServer(InetSocketAddress bindAddress, InputStream cacheConfigStream) throws IOException {
- if(cacheConfigStream == null) {
- throw new IllegalArgumentException("null 'cacheConfigStream' in constructor call.");
- }
- if(bindAddress == null) {
- throw new IllegalArgumentException("null 'bindAddress' in constructor call.");
- }
-
- try {
- CacheManager manager = new DefaultCacheManager(cacheConfigStream);
-
- // Configure the server.
- ServerBootstrap bootstrap = new ServerBootstrap(
- new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
-
- // Set up the event pipeline factory.
- bootstrap.setPipelineFactory(new WebSocketServerPipelineFactory(manager));
-
- // Bind and start to accept incoming connections.
- bootstrap.bind(bindAddress);
- } finally {
- cacheConfigStream.close();
- }
- }
-
- public static void main(String[] args) throws Exception {
- String host = System.getProperty(ORG_INFINISPAN_WS_HOST);
- String portConfig = System.getProperty(ORG_INFINISPAN_WS_PORT, "61999").trim();
- File cacheConfigFile = new File(System.getProperty(ORG_INFINISPAN_WS_CACHE_CONFIG_FILE, ORG_INFINISPAN_WS_CACHE_CONFIG_FILE_DEFAULT).trim());
- int port;
-
- try {
- port = Integer.parseInt(portConfig);
- } catch(NumberFormatException e) {
- throw new IllegalArgumentException("Invalid WebSocket port address '" + portConfig + "'. Must be a valid integer.");
- }
-
- if(!cacheConfigFile.exists()) {
- throw new IllegalArgumentException("Infinispan configuration file '" + cacheConfigFile.getAbsolutePath() + "' is not available.");
- }
- if(!cacheConfigFile.isFile()) {
- throw new IllegalArgumentException("Infinispan configuration file '" + cacheConfigFile.getAbsolutePath() + "' is not a readable file.");
- }
-
- InetSocketAddress inetAddress;
- if(host != null) {
- inetAddress = new InetSocketAddress(host, port);
- } else {
- inetAddress = new InetSocketAddress(port);
- }
-
- new WebSocketServer(inetAddress, new FileInputStream(cacheConfigFile));
-
- System.out.println("Infinispan Websocket Server listening on address '" + inetAddress + "'. Using Cache configuration '" + cacheConfigFile.getAbsolutePath() + "'.");
- System.out.println("Infinispan Websocket Server started.");
- }
-
- private static class WebSocketServerPipelineFactory implements ChannelPipelineFactory {
-
- private CacheManager cacheManager;
- private Map<String, OpHandler> operationHandlers;
- private Map<String, Cache> startedCaches = new ConcurrentHashMap<String, Cache>();
-
- public WebSocketServerPipelineFactory(CacheManager cacheManager) {
- this.cacheManager = cacheManager;
-
- operationHandlers = new HashMap<String, OpHandler>();
- operationHandlers.put("put", new PutHandler());
- operationHandlers.put("get", new GetHandler());
- operationHandlers.put("remove", new RemoveHandler());
- NotifyHandler notifyHandler = new NotifyHandler();
- operationHandlers.put("notify", notifyHandler);
- operationHandlers.put("unnotify", notifyHandler);
- }
-
- public ChannelPipeline getPipeline() throws Exception {
- // Create a default pipeline implementation.
- ChannelPipeline pipeline = Channels.pipeline();
-
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("handler", new WebSocketServerHandler(cacheManager, operationHandlers, startedCaches));
-
- return pipeline;
- }
- }
-
- public static String getJavascript() {
- if(javascript != null) {
- return javascript;
- }
-
- BufferedReader scriptReader = new BufferedReader(new InputStreamReader(WebSocketServer.class.getResourceAsStream(INFINISPAN_WS_JS_FILENAME)));
-
- try {
- StringWriter writer = new StringWriter();
-
- String line = scriptReader.readLine();
- while(line != null) {
- writer.write(line);
- writer.write('\n');
- line = scriptReader.readLine();
- }
-
- javascript = writer.toString();
-
- return javascript;
- } catch (IOException e) {
- throw new IllegalStateException("Unexpected exception while sending Websockets script to client.", e);
- } finally {
- try {
- scriptReader.close();
- } catch (IOException e) {
- throw new IllegalStateException("Unexpected exception while closing Websockets script to client.", e);
- }
- }
- }
-}
\ No newline at end of file
Deleted: trunk/server/websocket/src/main/java/org/infinispan/websocket/WebSocketServerHandler.java
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/websocket/WebSocketServerHandler.java 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/server/websocket/src/main/java/org/infinispan/websocket/WebSocketServerHandler.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -1,213 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2006, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.websocket;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.*;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.*;
-import static org.jboss.netty.handler.codec.http.HttpMethod.*;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.*;
-
-import java.io.StringWriter;
-import java.util.Map;
-
-import org.infinispan.Cache;
-import org.infinispan.manager.CacheManager;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
-import org.jboss.netty.handler.codec.http.HttpHeaders.Values;
-import org.jboss.netty.handler.codec.http.websocket.WebSocketFrame;
-import org.jboss.netty.handler.codec.http.websocket.WebSocketFrameDecoder;
-import org.jboss.netty.handler.codec.http.websocket.WebSocketFrameEncoder;
-import org.jboss.netty.util.CharsetUtil;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * Web Socket Server Handler (Netty).
- * <p/>
- * Websocket specific code lifted from Netty WebSocket Server example.
- */
-public class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
-
- private static final String INFINISPAN_WS_JS_FILENAME = "infinispan-ws.js";
- private CacheManager cacheManager;
- private Map<String, OpHandler> operationHandlers;
- private boolean connectionUpgraded;
- private Map<String, Cache> startedCaches;
-
- public WebSocketServerHandler(CacheManager cacheManager, Map<String, OpHandler> operationHandlers, Map<String, Cache> startedCaches) {
- this.cacheManager = cacheManager;
- this.operationHandlers = operationHandlers;
- this.startedCaches = startedCaches;
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- Object msg = e.getMessage();
- if (msg instanceof HttpRequest) {
- handleHttpRequest(ctx, (HttpRequest) msg);
- } else if (msg instanceof WebSocketFrame) {
- handleWebSocketFrame(ctx, (WebSocketFrame) msg);
- }
- }
-
- private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
- // Allow only GET methods.
- if (req.getMethod() != GET) {
- sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, FORBIDDEN));
- return;
- }
-
- if(!connectionUpgraded && req.getUri().equalsIgnoreCase("/" + INFINISPAN_WS_JS_FILENAME)) {
- DefaultHttpResponse res = new DefaultHttpResponse(HTTP_1_1, OK);
- loadScriptToResponse(req, res);
- sendHttpResponse(ctx, req, res);
- return;
- } else if (Values.UPGRADE.equalsIgnoreCase(req.getHeader(CONNECTION)) &&
- WEBSOCKET.equalsIgnoreCase(req.getHeader(Names.UPGRADE))) {
-
- // Serve the WebSocket handshake request.
-
- // Create the WebSocket handshake response.
- HttpResponse res = new DefaultHttpResponse(HTTP_1_1, new HttpResponseStatus(101, "Web Socket Protocol Handshake"));
- res.addHeader(Names.UPGRADE, WEBSOCKET);
- res.addHeader(CONNECTION, Values.UPGRADE);
- res.addHeader(WEBSOCKET_ORIGIN, req.getHeader(ORIGIN));
- res.addHeader(WEBSOCKET_LOCATION, getWebSocketLocation(req));
- String protocol = req.getHeader(WEBSOCKET_PROTOCOL);
- if (protocol != null) {
- res.addHeader(WEBSOCKET_PROTOCOL, protocol);
- }
-
- // Upgrade the connection and send the handshake response.
- ChannelPipeline p = ctx.getChannel().getPipeline();
- p.remove("aggregator");
- p.replace("decoder", "wsdecoder", new WebSocketFrameDecoder());
-
- ctx.getChannel().write(res);
-
- p.replace("encoder", "wsencoder", new WebSocketFrameEncoder());
- return;
- }
-
- // Send an error page otherwise.
- sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, FORBIDDEN));
- }
-
- private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
- try {
- JSONObject payload = new JSONObject(frame.getTextData());
- String opCode = (String) payload.get(OpHandler.OP_CODE);
- String cacheName = (String) payload.opt(OpHandler.CACHE_NAME);
- Cache<Object, Object> cache = getCache(cacheName);
-
- OpHandler handler = operationHandlers.get(opCode);
- if(handler != null) {
- handler.handleOp(payload, cache, ctx);
- }
- } catch (JSONException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- private Cache<Object, Object> getCache(final String cacheName) {
- String key = cacheName;
- Cache<Object, Object> cache;
-
- if(key == null) {
- key = "";
- }
-
- cache = startedCaches.get(key);
-
- if(cache == null) {
- synchronized (startedCaches) {
- cache = startedCaches.get(key);
- if(cache == null) {
- if(cacheName != null) {
- cache = cacheManager.getCache(key);
- } else {
- cache = cacheManager.getCache();
- }
- startedCaches.put(key, cache);
- cache.start();
- }
- }
- }
-
- return cache;
- }
-
- private void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
- // Generate an error page if response status code is not OK (200).
- if (res.getStatus().getCode() != 200) {
- res.setContent(ChannelBuffers.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8));
- setContentLength(res, res.getContent().readableBytes());
- }
-
- // Send the response and close the connection if necessary.
- ChannelFuture f = ctx.getChannel().write(res);
- if (!isKeepAlive(req) || res.getStatus().getCode() != 200) {
- f.addListener(ChannelFutureListener.CLOSE);
- }
- }
-
- private void loadScriptToResponse(HttpRequest req, DefaultHttpResponse res) {
- String wsAddress = getWebSocketLocation(req);
-
- StringWriter writer = new StringWriter();
- writer.write("var defaultWSAddress = '" + wsAddress + "';");
- writer.write(WebSocketServer.getJavascript());
-
- ChannelBuffer content = ChannelBuffers.copiedBuffer(writer.toString(), CharsetUtil.UTF_8);
-
- res.setHeader(CONTENT_TYPE, "text/javascript; charset=UTF-8");
- setContentLength(res, content.readableBytes());
- res.setContent(content);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- e.getCause().printStackTrace();
- e.getChannel().close();
- }
-
- private String getWebSocketLocation(HttpRequest req) {
- return "ws://" + req.getHeader(HttpHeaders.Names.HOST) + "/";
- }
-}
\ No newline at end of file
Copied: trunk/server/websocket/src/main/resources/org/infinispan/server/websocket/infinispan-ws.js (from rev 1738, trunk/server/websocket/src/main/resources/org/infinispan/websocket/infinispan-ws.js)
===================================================================
--- trunk/server/websocket/src/main/resources/org/infinispan/server/websocket/infinispan-ws.js (rev 0)
+++ trunk/server/websocket/src/main/resources/org/infinispan/server/websocket/infinispan-ws.js 2010-05-05 17:32:57 UTC (rev 1742)
@@ -0,0 +1,126 @@
+ // defaultWSAddress is inserted by the WS Server, which serves this script.
+
+function Cache(cacheName, wsAddress) {
+
+ var websocket;
+ var queuedMessages = [];
+ var callback;
+
+ openWebSocket();
+
+ function openWebSocket() {
+ if (window.WebSocket) {
+ if(wsAddress == null) {
+ wsAddress = defaultWSAddress;
+ }
+
+ websocket = new WebSocket(wsAddress);
+
+ websocket.onopen = function(event) {
+ for (i in queuedMessages) {
+ send(queuedMessages[i]);
+ }
+ queuedMessages = null;
+ }
+ } else {
+ alert("Sorry, cannot connect to Infinispan Cache. Your browser does not support WebSocket.");
+ }
+
+ websocket.onmessage = function(event) {
+ var jsonObj = JSON.parse(event.data);
+
+ if(jsonObj.value != null) {
+ if(jsonObj.mime == "application/json") {
+ var decodedObj = JSON.parse(jsonObj.value);
+ callback(jsonObj.key, decodedObj);
+ } else if(jsonObj.mime == "text/plain") {
+ callback(jsonObj.key, jsonObj.value);
+ }
+ } else {
+ callback(jsonObj.key, null);
+ }
+ };
+ }
+
+ this.registerCallback = function (callbackFunction) {
+ callback = callbackFunction;
+ }
+
+ this.put = function (key, value) {
+ var encodedObject = JSON.stringify(value);
+ if(encodedObject.charAt(0) == '{') {
+ put(key, encodedObject, "application/json") ;
+ } else {
+ put(key, value, "text/plain") ;
+ }
+ }
+
+ this.get = function (key) {
+ var jsonObj = {
+ "opCode" : "get",
+ "cacheName" : cacheName,
+ "key" : key,
+ };
+
+ send(jsonObj);
+ }
+
+ this.remove = function (key) {
+ var jsonObj = {
+ "opCode" : "remove",
+ "cacheName" : cacheName,
+ "key" : key,
+ };
+
+ send(jsonObj);
+ }
+
+ this.notify = function (key, onEvents) {
+ var jsonObj = {
+ "opCode" : "notify",
+ "cacheName" : cacheName,
+ "key" : key,
+ "onEvents" : onEvents
+ };
+
+ send(jsonObj);
+ }
+
+ this.unnotify = function (key) {
+ var jsonObj = {
+ "opCode" : "unnotify",
+ "cacheName" : cacheName,
+ "key" : key,
+ };
+
+ send(jsonObj);
+ }
+
+ function put(key, value, mimeType) {
+ var jsonObj = {
+ "opCode" : "put",
+ "cacheName" : cacheName,
+ "key" : key,
+ "value" : value,
+ "mime" : mimeType
+ };
+
+ send(jsonObj);
+ }
+
+ function send(jsonObj) {
+ if (websocket.readyState == WebSocket.OPEN) {
+ var jsonString = JSON.stringify(jsonObj);
+ websocket.send(jsonString);
+ } else {
+ if(queuedMessages == null) {
+ // reopen the websocket...
+ openWebSocket();
+ queuedMessages = [];
+ }
+
+ // Queue the message for sending once the socket is open...
+ queuedMessages[queuedMessages.length] = jsonObj;
+ }
+ }
+}
Copied: trunk/server/websocket/src/test/java/org/infinispan/server/websocket/handlers (from rev 1738, trunk/server/websocket/src/test/java/org/infinispan/websocket/handlers)
Modified: trunk/server/websocket/src/test/java/org/infinispan/server/websocket/handlers/MockClient.java
===================================================================
--- trunk/server/websocket/src/test/java/org/infinispan/websocket/handlers/MockClient.java 2010-05-05 13:02:55 UTC (rev 1738)
+++ trunk/server/websocket/src/test/java/org/infinispan/server/websocket/handlers/MockClient.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -19,14 +19,14 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.infinispan.websocket.handlers;
+package org.infinispan.server.websocket.handlers;
import org.infinispan.Cache;
import org.infinispan.manager.CacheManager;
import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.server.websocket.OpHandler;
import org.infinispan.websocket.MockChannel;
import org.infinispan.websocket.MockChannelHandlerContext;
-import org.infinispan.websocket.OpHandler;
import org.json.JSONException;
import org.json.JSONObject;
Modified: trunk/server/websocket/src/test/java/org/infinispan/server/websocket/handlers/OpHandlerTest.java
===================================================================
--- trunk/server/websocket/src/test/java/org/infinispan/websocket/handlers/OpHandlerTest.java 2010-05-05 13:02:55 UTC (rev 1738)
+++ trunk/server/websocket/src/test/java/org/infinispan/server/websocket/handlers/OpHandlerTest.java 2010-05-05 17:32:57 UTC (rev 1742)
@@ -19,10 +19,10 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.infinispan.websocket.handlers;
+package org.infinispan.server.websocket.handlers;
+import org.infinispan.server.websocket.OpHandler;
import org.infinispan.websocket.MockChannel;
-import org.infinispan.websocket.OpHandler;
import org.json.JSONException;
import org.json.JSONObject;
import org.testng.annotations.Test;
@@ -32,10 +32,9 @@
*
* @author <a href="mailto:tom.fennelly at gmail.com">tom.fennelly at gmail.com</a>
*/
- at Test
+ at Test (testName = "websocket.handlers.OpHandlerTest", groups = "unit")
public class OpHandlerTest {
- @Test
public void test() throws JSONException {
MockChannel mockChannel = new MockChannel();
MockClient firstCacheClient = new MockClient("firstCache", mockChannel);
Modified: trunk/src/main/resources/assemblies/all.xml
===================================================================
--- trunk/src/main/resources/assemblies/all.xml 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/src/main/resources/assemblies/all.xml 2010-05-05 17:32:57 UTC (rev 1742)
@@ -148,6 +148,7 @@
<include>org.infinispan:infinispan-server-rest</include>
<include>org.infinispan:infinispan-server-memcached</include>
<include>org.infinispan:infinispan-server-hotrod</include>
+ <include>org.infinispan:infinispan-server-websocket</include>
<include>org.infinispan:infinispan-client-hotrod</include>
<include>org.infinispan:infinispan-lucene-directory</include>
<include>org.infinispan:infinispan-lucene-demo</include>
@@ -185,6 +186,15 @@
</fileSet>
<fileSet>
+ <directory>src/main/distribution</directory>
+ <outputDirectory>etc</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ <includes>
+ <include>**/*.html</include>
+ </includes>
+ </fileSet>
+
+ <fileSet>
<directory>src/main/resources</directory>
<outputDirectory>bin</outputDirectory>
<lineEnding>dos</lineEnding>
Modified: trunk/src/main/resources/assemblies/bin.xml
===================================================================
--- trunk/src/main/resources/assemblies/bin.xml 2010-05-05 17:13:23 UTC (rev 1741)
+++ trunk/src/main/resources/assemblies/bin.xml 2010-05-05 17:32:57 UTC (rev 1742)
@@ -138,6 +138,8 @@
<include>org.infinispan:infinispan-lucene-directory</include>
<include>org.infinispan:infinispan-server-memcached</include>
<include>org.infinispan:infinispan-server-hotrod</include>
+ <include>org.infinispan:infinispan-server-websocket</include>
+ <include>org.infinispan:infinispan-client-hotrod</include>
</includes>
<sources>
<includeModuleDirectory>false</includeModuleDirectory>
@@ -172,6 +174,15 @@
</fileSet>
<fileSet>
+ <directory>src/main/distribution</directory>
+ <outputDirectory>etc</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ <includes>
+ <include>**/*.html</include>
+ </includes>
+ </fileSet>
+
+ <fileSet>
<directory>src/main/resources</directory>
<outputDirectory>bin</outputDirectory>
<lineEnding>dos</lineEnding>
@@ -241,6 +252,57 @@
</binaries>
</moduleSet>
+ <!-- For server-core module, only add scripts -->
+ <moduleSet>
+ <includeSubModules>false</includeSubModules>
+ <includes>
+ <include>org.infinispan:infinispan-server-core</include>
+ </includes>
+ <sources>
+ <includeModuleDirectory>false</includeModuleDirectory>
+
+ <fileSets>
+
+ <!-- Executable resources -->
+ <fileSet>
+ <directory>src/main/resources</directory>
+ <outputDirectory>bin</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ <includes>
+ <include>**/*.sh</include>
+ <include>**/*.py</include>
+ <include>**/*.rb</include>
+ </includes>
+ <fileMode>0777</fileMode>
+ </fileSet>
+
+ <fileSet>
+ <directory>src/main/resources</directory>
+ <outputDirectory>bin</outputDirectory>
+ <lineEnding>dos</lineEnding>
+ <includes>
+ <include>**/*.cmd</include>
+ <include>**/*.bat</include>
+ </includes>
+ <fileMode>0777</fileMode>
+ </fileSet>
+
+ <!-- EULAs and license files -->
+ <fileSet>
+ <directory>src/main/release</directory>
+ <outputDirectory></outputDirectory>
+ <lineEnding>dos</lineEnding>
+ <includes>
+ <include>**/*.txt</include>
+ </includes>
+ </fileSet>
+
+ </fileSets>
+
+ </sources>
+
+ </moduleSet>
+
</moduleSets>
<fileSets>
More information about the infinispan-commits
mailing list