001    package org.LiveGraph.dataFile.read;
002    
003    import java.io.IOException;
004    import java.io.InputStream;
005    
006    import org.LiveGraph.dataFile.common.PipeClosedByReaderException;
007    import org.LiveGraph.dataFile.common.PipeFullException;
008    import org.LiveGraph.dataFile.common.PipeNotConnectedException;
009    import org.LiveGraph.dataFile.write.PipedOutputStream;
010    
011    
012    /**
013     * This class makes Java's own {@code PipedInputStream} fit for reading by multiple Threads as
014     * required for LiveGraph.<br />
015     * <p>The thread handling built into Java's {@code java.io.PipedInputStream} gives, at the very best,
016     * reasons to hope for improvement. Sun seems to be aware of the problem: for instance, a Java API
017     * developer writes directly in the source comments of the officially distributed JDK 1.6.0 source
018     * package: &quot<em>[...] identification of the read and write sides needs to be more
019     * sophisticated [...]</em>;&quot; (see the non-JavDoc comments in the source for
020     * {@code java.io.PipedInputStream}, JDK 1.6.0, lines 38-41). However, to date the problem remains.</p>
021     * <p>For LiveGraph specifically, the problem is that {@code PipedInputStream} remembers
022     * the {@code Thread} that performed the latest read operation and checks before the following
023     * receive operation from the {@code PipedOutputStream}, whether that {@code Thread} is still alive.
024     * However, LiveGraph creates a new {@code Thread} for each update in order to make sure that the
025     * application remians responsive even if the amount of new data is large, the old threads are
026     * discarded, which causes {@code PipedInputStream} to throw an exception.<br />
027     * As a second problem, {@code PipedInputStream} causes the {@code write}-call of the
028     * {@code PipedOutputStream} to block indefinetly if the memory buffer is full. If the LiveGraph update
029     * frequency is set to low, the buffer may fill up which would cause the data producing part of the
030     * application to block. This is highly undesirable - while a short, time-limited block may be ok,
031     * an exception should be thrown if the buffer remains full for a long time to indicate to the developer
032     * that the chosen buffer size is not sufficiently large for the particular application.</p>
033     * <p>Unfortunately, the choice of scope classifiers for several methods in
034     * {@code java.io.PipedInputStream} is less than perfect. For instance, the method
035     * {@code receive(byte b[], int off, int len)} has package scope and cannot be overridden to
036     * resolve the above issues. In addition, the inapropriate use of package-visible variables such
037     * as {@code connected} instead of getter and setter methods makes overriding attampts useless. This
038     * forces LiveGraph to subclass {@code InputStream} directly to greate a better version of a piped input
039     * stream and reimplement <em>all</em> of {@code java.io.PipedInputStream}'s methods, thus unnecessarily
040     * replicating a lot of code. This may become unnecessary in future if the above problems are resolved
041     * or if LiveGraph is adapted to use the {@code java.nio} channel-based I/O instead of the
042     * traditional {@code java.io} stream-based approach (this would be a good idea anyway, if time permits
043     * to make these changes at some point in the future). For now, the source code of this class is copied
044     * from {@code java.io.PipedInputStream} that is dispributed in the source package for JDK 1.6.0 and
045     * changed where necessary.</p>
046     * 
047     * <p>
048     *   <strong>LiveGraph</strong>
049     *   (<a href="http://www.live-graph.org" target="_blank">http://www.live-graph.org</a>).
050     * </p> 
051     * <p>Copyright (c) 2007-2008 by G. Paperin.</p>
052     * <p>File: PipedInputStream.java</p>
053     * <p style="font-size:smaller;">Redistribution and use in source and binary forms, with or
054     *    without modification, are permitted provided that the following terms and conditions are met:
055     * </p>
056     * <p style="font-size:smaller;">1. Redistributions of source code must retain the above
057     *    acknowledgement of the LiveGraph project and its web-site, the above copyright notice,
058     *    this list of conditions and the following disclaimer.<br />
059     *    2. Redistributions in binary form must reproduce the above acknowledgement of the
060     *    LiveGraph project and its web-site, the above copyright notice, this list of conditions
061     *    and the following disclaimer in the documentation and/or other materials provided with
062     *    the distribution.<br />
063     *    3. All advertising materials mentioning features or use of this software or any derived
064     *    software must display the following acknowledgement:<br />
065     *    <em>This product includes software developed by the LiveGraph project and its
066     *    contributors.<br />(http://www.live-graph.org)</em><br />
067     *    4. All advertising materials distributed in form of HTML pages or any other technology
068     *    permitting active hyper-links that mention features or use of this software or any
069     *    derived software must display the acknowledgment specified in condition 3 of this
070     *    agreement, and in addition, include a visible and working hyper-link to the LiveGraph
071     *    homepage (http://www.live-graph.org).
072     * </p>
073     * <p style="font-size:smaller;">THIS SOFTWARE IS PROVIDED &quot;AS IS&quot;, WITHOUT WARRANTY
074     *    OF ANY KIND, EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
075     *    MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND  NONINFRINGEMENT. IN NO EVENT SHALL
076     *    THE AUTHORS, CONTRIBUTORS OR COPYRIGHT  HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
077     *    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING  FROM, OUT OF OR
078     *    IN CONNECTION WITH THE SOFTWARE OR THE USE OR  OTHER DEALINGS IN THE SOFTWARE.
079     * </p>
080     * 
081     * @author Greg Paperin (<a href="http://www.paperin.org" target="_blank">http://www.paperin.org</a>)
082     * @version {@value org.LiveGraph.LiveGraph#version}
083     * @see java.io.PipedInputStream
084     */
085    public class PipedInputStream extends InputStream {
086    
087    
088    private static final int DEFAULT_PIPE_SIZE = 1024; // bytes
089    private static final long DEFAULT_MAX_BLOCK_DURATION = 2000; // millisseconds
090    
091    
092    private boolean closedByWriter = false;
093    private boolean closedByReader = false;
094    private boolean connected = false;
095    
096    private long maxBlockDuration = DEFAULT_MAX_BLOCK_DURATION;
097    private long poolingPeriod = Math.min(1000L, Math.max(1L, DEFAULT_MAX_BLOCK_DURATION / 5L));
098    
099    
100    /**
101     * The circular buffer into which incoming data is placed.
102     */
103    private byte buffer[];
104    
105    /**
106     * The index of the position in the circular buffer at which the next byte of data will be
107     * stored when received from the connected piped output stream. <code>in&lt;0</code> implies
108     * the buffer is empty, <code>in==out</code> implies the buffer is full
109     */
110    private int in = -1;
111    
112    /**
113     * The index of the position in the circular buffer at which the next byte of data will be
114     * read by this piped input stream.
115     */
116    private int out = 0;
117    
118    /**
119     * Creates a <code>PipedInputStream</code> so that it is not yet
120     * {@linkplain #connect(org.LiveGraph.dataFile.write.PipedOutputStream) connected}.
121     * It must be connected to a <code>PipedOutputStream</code> before being used.
122     */
123    public PipedInputStream() {
124            initPipe(DEFAULT_PIPE_SIZE);
125    }
126    
127    /**
128     * Creates a <code>PipedInputStream</code> so that it is not yet
129     * {@linkplain #connect(org.LiveGraph.dataFile.write.PipedOutputStream) connected} and uses the
130     * specified pipe size for the pipe's buffer.
131     * It must be connected to a <code>PipedOutputStream</code> before being used.
132     *
133     * @param      pipeSize the size of the pipe's buffer.
134     * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
135     */
136    public PipedInputStream(int pipeSize) {
137            initPipe(pipeSize);
138    }
139    
140    /**
141     * Creates a <code>PipedInputStream</code> so that it is connected to the piped output
142     * stream <code>src</code>. Data bytes written to <code>src</code> will then be available
143     * as input from this stream.
144     *
145     * @param      src   the stream to connect to.
146     * @exception  IOException  if an I/O error occurs.
147     */
148    public PipedInputStream(PipedOutputStream src) throws IOException {
149        this(src, DEFAULT_PIPE_SIZE);
150    }
151    
152    /**
153     * Creates a <code>PipedInputStream</code> so that it is connected to the piped output stream
154     * <code>src</code> and uses the specified pipe size for the pipe's buffer.
155     * Data bytes written to <code>src</code> will then be available as input from this stream.
156     *
157     * @param      src   the stream to connect to.
158     * @param      pipeSize the size of the pipe's buffer.
159     * @exception  IOException  if an I/O error occurs.
160     * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
161     */
162    public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
163            initPipe(pipeSize);
164            connect(src);
165    }
166    
167    private void initPipe(int pipeSize) {
168            if (pipeSize <= 0)
169             throw new IllegalArgumentException("Pipe Size <= 0");
170         buffer = new byte[pipeSize];
171    }
172    
173    /**
174     * Causes this piped input stream to be connected to the piped output stream <code>src</code>.
175     * If this object is already connected to some other piped output  stream, an <code>IOException</code>
176     * is thrown.<br />
177     * If <code>src</code> is an unconnected piped output stream and <code>snk</code>
178     * is an unconnected piped input stream, they may be connected by either the call:<br />
179     * <pre><code>snk.connect(src)</code></pre><br />
180     * or the call:<br />
181     * <pre><code>src.connect(snk)</code></pre><br />
182     * The two calls have the same effect.
183     *
184     * @param      src   The piped output stream to connect to.
185     * @exception  IOException  if an I/O error occurs.
186     */
187    public void connect(PipedOutputStream src) throws IOException {
188            
189            if (null == src)
190                    throw new NullPointerException("Cannot connect to a null source");
191            
192            src.connect(this);
193            
194            this.in = -1;
195            this.out = 0;
196            this.setConnected(true);
197    }
198    
199    public synchronized void setMaxBlockDuration(long v) {
200            maxBlockDuration = Math.max(0L, v);
201            poolingPeriod = Math.min(1000L, Math.max(1L, maxBlockDuration / 5L));
202    }
203    
204    public synchronized long getMaxBlockDuration() {
205            return maxBlockDuration;
206    }
207    
208    protected synchronized boolean getClosedByWriter() {
209            return closedByWriter;
210    }
211    
212    protected synchronized void setClosedByWriter(boolean v) {
213            closedByWriter = v;
214    }
215    
216    protected synchronized boolean getClosedByReader() {
217            return closedByReader; 
218    }
219    
220    protected synchronized void setClosedByReader(boolean v) {
221            closedByReader = v; 
222    }
223    
224    public synchronized boolean getConnected() {
225            return connected;
226    }
227    
228    protected synchronized void setConnected(boolean v) {
229            connected = v;
230    }
231    
232    /**
233     * Receives a byte of data. This method will block if no input is available.
234     * @param b the byte being received
235     * @exception IOException If the pipe is broken,
236     * {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected}, closed, or if an
237     * I/O error occurs; specifically, a {@link PipeFullException}, if the receiving buffer is full
238     * and waiting times out.
239     */
240    public synchronized void receive(int b) throws IOException {
241            
242        awaitSpace();
243        
244        if (in < 0) {
245            in = 0;
246            out = 0;
247        }
248        
249        buffer[in++] = (byte)(b & 0xFF);
250        if (in >= buffer.length)
251            in = 0;
252    }
253    
254    /**
255     * Receives data into an array of bytes. This method will block until some input is available.
256     * @param b the buffer into which the data is received
257     * @param off the start offset of the data
258     * @param len the maximum number of bytes received
259     * @exception IOException If the pipe is broken,
260     * {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected}, closed, or if an
261     * I/O error occurs; specifically, a {@link PipeFullException}, if the receiving buffer is full
262     * and waiting times out.
263     */
264    public synchronized void receive(byte b[], int off, int len)  throws IOException {
265            
266            int bytesToTransfer = len;
267            
268            while (bytesToTransfer > 0) {
269                    
270                    awaitSpace();
271                    
272                    int nextTransferAmount = 0;
273                    if (out < in) {
274                            nextTransferAmount = buffer.length - in;
275                    } else if (out > in) {
276                            if (in == -1) {
277                                    in = out = 0;
278                                    nextTransferAmount = buffer.length - in;
279                            } else {
280                                    nextTransferAmount = out - in;
281                            }
282                    }
283                    
284                    if (nextTransferAmount > bytesToTransfer)
285                            nextTransferAmount = bytesToTransfer;
286                    
287                    assert(nextTransferAmount > 0);
288                    
289                    System.arraycopy(b, off, buffer, in, nextTransferAmount);
290                    
291                    bytesToTransfer -= nextTransferAmount;
292                    off += nextTransferAmount;
293                    in += nextTransferAmount;
294                    if (in >= buffer.length)
295                            in = 0;
296            }
297    }
298    
299    private void checkStateForReceive() throws IOException {
300            
301            if (!getConnected())
302                    throw new PipeNotConnectedException("Pipe not connected");
303            
304            if (getClosedByWriter())
305                    throw new PipeClosedByReaderException("Pipe closed by writer");
306            
307            if (getClosedByReader())
308                    throw new PipeClosedByReaderException("Pipe closed by reader");
309            
310    }
311    
312    
313    private void awaitSpace() throws IOException {
314            
315            checkStateForReceive();
316            
317            if (in != out)
318                    return;
319            
320            long startedWaiting = System.currentTimeMillis();
321            
322            while (in == out) {
323                    checkStateForReceive();
324                    
325                    if (System.currentTimeMillis() - startedWaiting > maxBlockDuration)
326                            throw new PipeFullException("Cannot receive data: buffer full?");
327                    
328                    // kick any waiting readers and wait:
329                    notifyAll();
330                    try { wait(poolingPeriod); }
331                    catch (InterruptedException ex) { }
332            }
333    }
334    
335    /**
336     * Notifies all waiting threads that the last byte of data has been
337     * received.
338     */
339    public synchronized void receivedLast() {
340            setClosedByWriter(true);
341            notifyAll();
342    }
343    
344    /**
345     * Reads the next byte of data from this piped input stream. The value byte is returned as
346     * an <code>int</code> in the range <code>0</code> to <code>255</code>. This method blocks
347     * until input data is available, the end of the stream is detected, or an exception is thrown.
348     *
349     * @return     the next byte of data, or <code>-1</code> if the end of the
350     *             stream is reached.
351     * @exception  IOException  if the pipe is
352     * {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected}, broken, closed, or if an
353     * I/O error occurs.
354     */
355    @Override
356    public synchronized int read() throws IOException {
357            
358            if (!getConnected()) {
359                    throw new PipeNotConnectedException("Pipe not connected");
360            } else if (getClosedByReader()) {
361                    throw new PipeClosedByReaderException("Pipe closed by reader");
362            }
363            
364            // Wait for data:
365            if (in < 0) {
366                    while (in < 0) {
367                            
368                            // If closed by writer, return EOF
369                            if (getClosedByWriter())
370                                    return -1;
371                            
372                            // Might be a writer waiting:
373                            notifyAll();
374                            try { wait(poolingPeriod); }
375                            catch (InterruptedException ex) { }
376                    }
377            }
378            
379            int ret = buffer[out++] & 0xFF;
380            if (out >= buffer.length)
381                    out = 0;
382            
383            if (in == out)
384                    in = -1;
385    
386            return ret;
387    }
388    
389    /**
390     * Reads up to <code>len</code> bytes of data from this piped input stream into an array of bytes.
391     * Less than <code>len</code> bytes will be read if the end of the data stream is reached or if 
392     * <code>len</code> exceeds the pipe's buffer size. If <code>len </code> is zero, then no bytes
393     * are read and 0 is returned; otherwise, the method blocks until at least 1 byte of input is 
394     * available, end of the stream has been detected, or an exception is thrown.
395     *
396     * @param      b     the buffer into which the data is read.
397     * @param      off   the start offset in the destination array <code>b</code>
398     * @param      len   the maximum number of bytes read.
399     * @return     the total number of bytes read into the buffer, or <code>-1</code> if there is no
400     *                      more data because the end of the stream has been reached.
401     * @exception  NullPointerException If <code>b</code> is <code>null</code>.
402     * @exception  IndexOutOfBoundsException If <code>off</code> is negative, 
403     * <code>len</code> is negative, or <code>len</code> is greater than 
404     * <code>b.length - off</code>
405     * @exception  IOException if the pipe is broken,
406     * {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected}, closed, or if an
407     * I/O error occurs.
408     */
409    @Override
410    public synchronized int read(byte b[], int off, int len)  throws IOException {
411            
412            if (null == b)
413                    throw new NullPointerException("Cannot read into a null buffer");
414                    
415            if (off < 0 || len < 0 || len > b.length - off)
416                    throw new IndexOutOfBoundsException();
417                    
418            if (len == 0)
419                    return 0;
420    
421        // Possibly wait on the first character:
422            int c = read();
423            if (c < 0)
424                    return -1;
425    
426            b[off] = (byte) c;
427            int rlen = 1;
428            while (in >= 0 && len > 1) {
429        
430                    int available; 
431    
432                    if (in > out)
433                            available = Math.min((buffer.length - out), (in - out));
434                    else
435                            available = buffer.length - out;
436    
437                    // A byte is read beforehand outside the loop
438                    if (available > (len - 1))
439                            available = len - 1;
440            
441                    System.arraycopy(buffer, out, b, off + rlen, available);
442                    out += available;
443                    rlen += available; 
444                    len -= available;
445        
446                    if (out >= buffer.length)
447                            out = 0;
448                    
449                    if (in == out)
450                            in = -1;
451            }
452            
453            return rlen;
454    }
455    
456    /**
457     * Returns the number of bytes that can be read from this input stream without blocking.
458     *
459     * @return the number of bytes that can be read from this input stream without blocking,
460     *                 or {@code 0} if this input stream has been closed by invoking its {@link #close()} method,
461     *                 or if the pipe is {@link #connect(org.LiveGraph.dataFile.write.PipedOutputStream) unconnected},
462     *                 or broken.
463     * @exception  IOException  if an I/O error occurs.
464     */
465    @Override
466    public synchronized int available() throws IOException {
467            
468            if (in < 0)
469                    return 0;
470            
471            if (in == out)
472                    return buffer.length;
473            
474            if (in > out)
475                    return in - out;
476            
477            return in + buffer.length - out;
478    }
479    
480    /**
481     * Closes this piped input stream and releases any system resources associated with the stream.
482     * @exception  IOException  if an I/O error occurs.
483     */
484    @Override
485    public synchronized void close() throws IOException {
486            setClosedByReader(true);
487            in = -1;
488    }
489    
490    }  // public class PipedInputStream