LiveGraph
data visualisation and analysis framework

org.LiveGraph.dataFile.read
Class PipedInputStream

java.lang.Object
  extended by java.io.InputStream
      extended by org.LiveGraph.dataFile.read.PipedInputStream
All Implemented Interfaces:
java.io.Closeable

public class PipedInputStream
extends java.io.InputStream

This class makes Java's own PipedInputStream fit for reading by multiple Threads as required for LiveGraph.

The thread handling built into Java's java.io.PipedInputStream gives, at the very best, reasons to hope for improvement. Sun seems to be aware of the problem: for instance, a Java API developer writes directly in the source comments of the officially distributed JDK 1.6.0 source package: "[...] identification of the read and write sides needs to be more sophisticated [...];" (see the non-JavDoc comments in the source for java.io.PipedInputStream, JDK 1.6.0, lines 38-41). However, to date the problem remains.

For LiveGraph specifically, the problem is that PipedInputStream remembers the Thread that performed the latest read operation and checks before the following receive operation from the PipedOutputStream, whether that Thread is still alive. However, LiveGraph creates a new Thread for each update in order to make sure that the application remians responsive even if the amount of new data is large, the old threads are discarded, which causes PipedInputStream to throw an exception.
As a second problem, PipedInputStream causes the write-call of the PipedOutputStream to block indefinetly if the memory buffer is full. If the LiveGraph update frequency is set to low, the buffer may fill up which would cause the data producing part of the application to block. This is highly undesirable - while a short, time-limited block may be ok, an exception should be thrown if the buffer remains full for a long time to indicate to the developer that the chosen buffer size is not sufficiently large for the particular application.

Unfortunately, the choice of scope classifiers for several methods in java.io.PipedInputStream is less than perfect. For instance, the method receive(byte b[], int off, int len) has package scope and cannot be overridden to resolve the above issues. In addition, the inapropriate use of package-visible variables such as connected instead of getter and setter methods makes overriding attampts useless. This forces LiveGraph to subclass InputStream directly to greate a better version of a piped input stream and reimplement all of java.io.PipedInputStream's methods, thus unnecessarily replicating a lot of code. This may become unnecessary in future if the above problems are resolved or if LiveGraph is adapted to use the java.nio channel-based I/O instead of the traditional java.io stream-based approach (this would be a good idea anyway, if time permits to make these changes at some point in the future). For now, the source code of this class is copied from java.io.PipedInputStream that is dispributed in the source package for JDK 1.6.0 and changed where necessary.

LiveGraph (http://www.live-graph.org).

Copyright (c) 2007-2008 by G. Paperin.

File: PipedInputStream.java

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following terms and conditions are met:

1. Redistributions of source code must retain the above acknowledgement of the LiveGraph project and its web-site, the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above acknowledgement of the LiveGraph project and its web-site, the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. All advertising materials mentioning features or use of this software or any derived software must display the following acknowledgement:
This product includes software developed by the LiveGraph project and its contributors.
(http://www.live-graph.org)

4. All advertising materials distributed in form of HTML pages or any other technology permitting active hyper-links that mention features or use of this software or any derived software must display the acknowledgment specified in condition 3 of this agreement, and in addition, include a visible and working hyper-link to the LiveGraph homepage (http://www.live-graph.org).

THIS SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS, CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Version:
"2.0.beta01"
Author:
Greg Paperin (http://www.paperin.org)
See Also:
PipedInputStream

Field Summary
private  byte[] buffer
          The circular buffer into which incoming data is placed.
private  boolean closedByReader
           
private  boolean closedByWriter
           
private  boolean connected
           
private static long DEFAULT_MAX_BLOCK_DURATION
           
private static int DEFAULT_PIPE_SIZE
           
private  int in
          The index of the position in the circular buffer at which the next byte of data will be stored when received from the connected piped output stream.
private  long maxBlockDuration
           
private  int out
          The index of the position in the circular buffer at which the next byte of data will be read by this piped input stream.
private  long poolingPeriod
           
 
Constructor Summary
PipedInputStream()
          Creates a PipedInputStream so that it is not yet connected.
PipedInputStream(int pipeSize)
          Creates a PipedInputStream so that it is not yet connected and uses the specified pipe size for the pipe's buffer.
PipedInputStream(PipedOutputStream src)
          Creates a PipedInputStream so that it is connected to the piped output stream src.
PipedInputStream(PipedOutputStream src, int pipeSize)
          Creates a PipedInputStream so that it is connected to the piped output stream src and uses the specified pipe size for the pipe's buffer.
 
Method Summary
 int available()
          Returns the number of bytes that can be read from this input stream without blocking.
private  void awaitSpace()
           
private  void checkStateForReceive()
           
 void close()
          Closes this piped input stream and releases any system resources associated with the stream.
 void connect(PipedOutputStream src)
          Causes this piped input stream to be connected to the piped output stream src.
protected  boolean getClosedByReader()
           
protected  boolean getClosedByWriter()
           
 boolean getConnected()
           
 long getMaxBlockDuration()
           
private  void initPipe(int pipeSize)
           
 int read()
          Reads the next byte of data from this piped input stream.
 int read(byte[] b, int off, int len)
          Reads up to len bytes of data from this piped input stream into an array of bytes.
 void receive(byte[] b, int off, int len)
          Receives data into an array of bytes.
 void receive(int b)
          Receives a byte of data.
 void receivedLast()
          Notifies all waiting threads that the last byte of data has been received.
protected  void setClosedByReader(boolean v)
           
protected  void setClosedByWriter(boolean v)
           
protected  void setConnected(boolean v)
           
 void setMaxBlockDuration(long v)
           
 
Methods inherited from class java.io.InputStream
mark, markSupported, read, reset, skip
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_PIPE_SIZE

private static final int DEFAULT_PIPE_SIZE
See Also:
Constant Field Values

DEFAULT_MAX_BLOCK_DURATION

private static final long DEFAULT_MAX_BLOCK_DURATION
See Also:
Constant Field Values

closedByWriter

private boolean closedByWriter

closedByReader

private boolean closedByReader

connected

private boolean connected

maxBlockDuration

private long maxBlockDuration

poolingPeriod

private long poolingPeriod

buffer

private byte[] buffer
The circular buffer into which incoming data is placed.


in

private int in
The index of the position in the circular buffer at which the next byte of data will be stored when received from the connected piped output stream. in<0 implies the buffer is empty, in==out implies the buffer is full


out

private int out
The index of the position in the circular buffer at which the next byte of data will be read by this piped input stream.

Constructor Detail

PipedInputStream

public PipedInputStream()
Creates a PipedInputStream so that it is not yet connected. It must be connected to a PipedOutputStream before being used.


PipedInputStream

public PipedInputStream(int pipeSize)
Creates a PipedInputStream so that it is not yet connected and uses the specified pipe size for the pipe's buffer. It must be connected to a PipedOutputStream before being used.

Parameters:
pipeSize - the size of the pipe's buffer.
Throws:
java.lang.IllegalArgumentException - if pipeSize <= 0.

PipedInputStream

public PipedInputStream(PipedOutputStream src)
                 throws java.io.IOException
Creates a PipedInputStream so that it is connected to the piped output stream src. Data bytes written to src will then be available as input from this stream.

Parameters:
src - the stream to connect to.
Throws:
java.io.IOException - if an I/O error occurs.

PipedInputStream

public PipedInputStream(PipedOutputStream src,
                        int pipeSize)
                 throws java.io.IOException
Creates a PipedInputStream so that it is connected to the piped output stream src and uses the specified pipe size for the pipe's buffer. Data bytes written to src will then be available as input from this stream.

Parameters:
src - the stream to connect to.
pipeSize - the size of the pipe's buffer.
Throws:
java.io.IOException - if an I/O error occurs.
java.lang.IllegalArgumentException - if pipeSize <= 0.
Method Detail

initPipe

private void initPipe(int pipeSize)

connect

public void connect(PipedOutputStream src)
             throws java.io.IOException
Causes this piped input stream to be connected to the piped output stream src. If this object is already connected to some other piped output stream, an IOException is thrown.
If src is an unconnected piped output stream and snk is an unconnected piped input stream, they may be connected by either the call:
snk.connect(src)

or the call:
src.connect(snk)

The two calls have the same effect.

Parameters:
src - The piped output stream to connect to.
Throws:
java.io.IOException - if an I/O error occurs.

setMaxBlockDuration

public void setMaxBlockDuration(long v)

getMaxBlockDuration

public long getMaxBlockDuration()

getClosedByWriter

protected boolean getClosedByWriter()

setClosedByWriter

protected void setClosedByWriter(boolean v)

getClosedByReader

protected boolean getClosedByReader()

setClosedByReader

protected void setClosedByReader(boolean v)

getConnected

public boolean getConnected()

setConnected

protected void setConnected(boolean v)

receive

public void receive(int b)
             throws java.io.IOException
Receives a byte of data. This method will block if no input is available.

Parameters:
b - the byte being received
Throws:
java.io.IOException - If the pipe is broken, unconnected, closed, or if an I/O error occurs; specifically, a PipeFullException, if the receiving buffer is full and waiting times out.

receive

public void receive(byte[] b,
                    int off,
                    int len)
             throws java.io.IOException
Receives data into an array of bytes. This method will block until some input is available.

Parameters:
b - the buffer into which the data is received
off - the start offset of the data
len - the maximum number of bytes received
Throws:
java.io.IOException - If the pipe is broken, unconnected, closed, or if an I/O error occurs; specifically, a PipeFullException, if the receiving buffer is full and waiting times out.

checkStateForReceive

private void checkStateForReceive()
                           throws java.io.IOException
Throws:
java.io.IOException

awaitSpace

private void awaitSpace()
                 throws java.io.IOException
Throws:
java.io.IOException

receivedLast

public void receivedLast()
Notifies all waiting threads that the last byte of data has been received.


read

public int read()
         throws java.io.IOException
Reads the next byte of data from this piped input stream. The value byte is returned as an int in the range 0 to 255. This method blocks until input data is available, the end of the stream is detected, or an exception is thrown.

Specified by:
read in class java.io.InputStream
Returns:
the next byte of data, or -1 if the end of the stream is reached.
Throws:
java.io.IOException - if the pipe is unconnected, broken, closed, or if an I/O error occurs.

read

public int read(byte[] b,
                int off,
                int len)
         throws java.io.IOException
Reads up to len bytes of data from this piped input stream into an array of bytes. Less than len bytes will be read if the end of the data stream is reached or if len exceeds the pipe's buffer size. If len is zero, then no bytes are read and 0 is returned; otherwise, the method blocks until at least 1 byte of input is available, end of the stream has been detected, or an exception is thrown.

Overrides:
read in class java.io.InputStream
Parameters:
b - the buffer into which the data is read.
off - the start offset in the destination array b
len - the maximum number of bytes read.
Returns:
the total number of bytes read into the buffer, or -1 if there is no more data because the end of the stream has been reached.
Throws:
java.lang.NullPointerException - If b is null.
java.lang.IndexOutOfBoundsException - If off is negative, len is negative, or len is greater than b.length - off
java.io.IOException - if the pipe is broken, unconnected, closed, or if an I/O error occurs.

available

public int available()
              throws java.io.IOException
Returns the number of bytes that can be read from this input stream without blocking.

Overrides:
available in class java.io.InputStream
Returns:
the number of bytes that can be read from this input stream without blocking, or 0 if this input stream has been closed by invoking its close() method, or if the pipe is unconnected, or broken.
Throws:
java.io.IOException - if an I/O error occurs.

close

public void close()
           throws java.io.IOException
Closes this piped input stream and releases any system resources associated with the stream.

Specified by:
close in interface java.io.Closeable
Overrides:
close in class java.io.InputStream
Throws:
java.io.IOException - if an I/O error occurs.

LiveGraph
data visualisation and analysis framework