How to implement Streaming server like The Bayeux Protocol

sumitatgharkikhoj mailtosumitsingh at gmail.com
Wed Nov 25 11:40:47 EST 2009


I Further looked at the ChunkedFile.java it is no different with what i am
doing if the file size is very huge.
Anyways, I also found the point of failure:

The problem is with the "thread.Sleep"  call ,even if i use blocking
collection classes my server gets stuck after two clients connect. 

While i tried the same test with jetty + nio it is working fine ( even with
sleep).

So following works:
1) Same code without sleep
2) Same code without wait/notify or blocking collection classes.
3)Jetty with sleep, blocking collection classes + nio

Following does not works:
1) Samecode base with sleep
2) Same code base with use of blocking collection classes.

This is real critical for me as i am using netty for my web services which
is working amazing, I cannot mix and match netty + jetty. 

sumitatgharkikhoj wrote:
> 
> First of all thank you for the nice framework, forum and documentation.
> My problem is i am trying to write a http server with small footprint (
> will go as a component in bigger framework) but it needs to have streaming
> support .
> Different clients will connect at different times ranging from 100-1000
> clients max and this server will be used for both streaming and short
> data.
> the data is streamed using XHR multipart request and continously (24/7) . 
> But when i used the example code (please see below) the server blocks
> after first thread.
> Other option that i tried was jetty with continuation or cometd but i am
> more inclined about netty (since i want to replace my mina udp layer with
> mina and maintain one single library ).
> 
> Any help will be appreciated, I have my machine available via webex to
> debug as well.
> 
> 
> my server gets block at second request in the following code :
> 
> 1) in my http server:
>     	   ServerBootstrap bootstrap = new ServerBootstrap(new
> NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));
>            bootstrap.setPipelineFactory(new
> HttpServerPipelineFactory(requestHandler));
>           bootstrap.bind(new InetSocketAddress(8080));
> 2) in my pipelinefactory:
>     public ChannelPipeline getPipeline() throws Exception {
>         ChannelPipeline pipeline = pipeline();
>         pipeline.addLast("decoder", new HttpRequestDecoder());
>         pipeline.addLast("encoder", new HttpResponseEncoder());
>         pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
>         pipeline.addLast("handler", requestHandler);
>         return pipeline;         
> 3)my http request handler sends event using following code 
> 	private void writeResponse(MessageEvent e) {
> 		ChannelBuffer buf =
> ChannelBuffers.copiedBuffer(responseContent.toString(), "UTF-8");
> 		buf.setBytes(0, responseContent.toString().getBytes() );
> 		responseContent.setLength(0);
> 		boolean close =  HttpHeaders.Values.CLOSE.equalsIgnoreCase(request
> 				.getHeader(HttpHeaders.Names.CONNECTION))
> 				|| request.getProtocolVersion().equals(HttpVersion.HTTP_1_0)
> 				&& !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(request
> 						.getHeader(HttpHeaders.Names.CONNECTION));
> 
> 		HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,		
> HttpResponseStatus.OK);
> 		//response.setContent(buf);
> 		//response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/html;
> charset=UTF-8");
> 		response.setHeader(HttpHeaders.Names.CONTENT_TYPE,
> "multipart/x-mixed-replace;boundary=XXoXoX");
> 		//if (!close) 
> 		{
> 			response.setHeader("Connection", "Keep-Alive");
> 			response.setHeader("Cache-Control", "no-cache");
> 			response.setHeader("Access-Control-Allow-Origin", "*");
> 		}
> 
> 		String cookieString = request.getHeader(HttpHeaders.Names.COOKIE);
> 		if (cookieString != null) {
> 			CookieDecoder cookieDecoder = new CookieDecoder();
> 			Set<Cookie> cookies = cookieDecoder.decode(cookieString);
> 			if (!cookies.isEmpty()) {
> 				CookieEncoder cookieEncoder = new CookieEncoder(true);
> 				for (Cookie cookie : cookies) {
> 					cookieEncoder.addCookie(cookie);
> 				}
> 				response.addHeader(HttpHeaders.Names.SET_COOKIE,
> cookieEncoder.encode());
> 			}
> 		}
> 		ChannelFuture future =null;
> 		future = e.getChannel().write(response);
> 		String temp ="";
> 		
> 		if(future.isSuccess()==false)
> 			return;
> 		
> 		e.getChannel().write(new ChunkedInput() {
> 			
> 			int i = 0;
> 			public Object nextChunk() throws Exception {
> 				i++;
> 				String temp ="";
> 				//temp ="--XXoXoX\r\n";
> 				temp = "Content-Type: text/plain \r\n\r\n";
> 				temp += "{\"data\":\""+i+"\"}";
> 				temp +="\r\n--XXoXoX\r\n";
> 				ChannelBuffer buf2 = ChannelBuffers.copiedBuffer(temp.toString(),
> "UTF-8");
> 				Thread.sleep(100);
> 				return buf2;
> 			}
> 			
> 			@Override
> 			public boolean hasNextChunk() throws Exception {
> 					return true;
> 			}
> 			
> 			@Override
> 			public void close() throws Exception {
> 				// TODO Auto-generated method stub
> 				
> 			}
> 		});
> 		
> 	if(close){
> 		future.addListener(ChannelFutureListener.CLOSE);
> 	}
> 	}
> 
> 

-- 
View this message in context: http://n2.nabble.com/How-to-implement-Streaming-server-like-The-Bayeux-Protocol-tp4055722p4066059.html
Sent from the Netty User Group mailing list archive at Nabble.com.


More information about the netty-users mailing list