001 package org.LiveGraph.dataFile.write; 002 003 import java.io.IOException; 004 import java.io.OutputStream; 005 006 import org.LiveGraph.dataFile.common.PipeFullException; 007 import org.LiveGraph.dataFile.read.PipedInputStream; 008 009 010 public class PipedOutputStream extends OutputStream { 011 012 013 private PipedInputStream sink; 014 015 /** 016 * Creates a piped output stream that is not yet connected to a piped input stream. 017 * It must be connected to a piped input stream, either by the receiver or the sender, before being used. 018 */ 019 public PipedOutputStream() { 020 super(); 021 } 022 023 /** 024 * Creates a piped output stream connected to the specified piped input stream. 025 * Data bytes written to this stream will then be available as input from <code>snk</code>. 026 * @param snk The piped input stream to connect to. 027 * @exception IOException if an I/O error occurs. 028 */ 029 public PipedOutputStream(PipedInputStream snk) throws IOException { 030 super(); 031 connect(snk); 032 } 033 034 035 /** 036 * Connects this piped output stream to a receiver. If this object is already connected to some other 037 * piped input stream, an <code>IOException</code> is thrown.<br /> 038 * If <code>snk</code> is an unconnected piped input stream and <code>src</code> is an unconnected piped 039 * output stream, they may be connected by either the call:<br /> 040 * <pre>src.connect(snk)</pre><br /> 041 * or the call:<br /> 042 * <pre>snk.connect(src)</pre><br /> 043 * The two calls have the same effect. 044 * 045 * @param sink the piped input stream to connect to. 046 * @exception IOException if an I/O error occurs. 047 */ 048 public synchronized void connect(PipedInputStream sink) throws IOException { 049 050 if (null == sink) 051 throw new NullPointerException("Cannot connect to a null sink"); 052 053 // Prevent recursive calls: 054 if (sink == this.sink) 055 return; 056 057 if (sink.getConnected()) 058 throw new IOException("Sink is already connected"); 059 060 this.sink = sink; 061 sink.connect(this); 062 } 063 064 /** 065 * Writes the specified <code>byte</code> to the piped output stream. <br /> 066 * Implements the <code>write</code> method of <code>OutputStream</code>. 067 * This method blocks for a while to wait until the byte is written to 068 * the output stream, but quits the block and throws a {@code PipedInputStream.PipeFullException} if 069 * the data cannot be written within a certain time period. 070 * 071 * @param b the <code>byte</code> to be written. 072 * @exception IOException if the pipe is broken, 073 * {@link #connect(org.LiveGraph.dataFile.read.PipedInputStream) unconnected}, closed, 074 * or if an I/O error occurs; specifically, a {link {@link PipeFullException}, if the 075 * receiving buffer is full and waiting times out. 076 */ 077 @Override 078 public void write(int b) throws IOException { 079 080 if (null == sink) 081 throw new IOException("Pipe not connected"); 082 083 sink.receive(b); 084 } 085 086 /** 087 * Writes <code>len</code> bytes from the specified byte array starting at offset <code>off</code> to 088 * this piped output stream. This method blocks for a while to wait until all the bytes are written to 089 * the output stream, but quits the block and throws a {@code PipedInputStream.PipeFullException} if 090 * the data cannot be written within a certain time period. 091 * 092 * @param b the data. 093 * @param off the start offset in the data. 094 * @param len the number of bytes to write. 095 * @exception IOException if the pipe is broken, 096 * {@link #connect(org.LiveGraph.dataFile.read.PipedInputStream) unconnected}, closed, 097 * or if an I/O error occurs; specifically, a {link {@link PipeFullException}, if the 098 * receiving buffer is full and waiting times out. 099 */ 100 @Override 101 public void write(byte b[], int off, int len) throws IOException { 102 103 if (null == sink) 104 throw new IOException("Pipe not connected"); 105 106 if (null == b ) 107 throw new NullPointerException("Cannot read into a new buffer"); 108 109 if ( (off < 0) || (off > b.length) || (len < 0) || (off + len > b.length) || (off + len < 0) ) 110 throw new IndexOutOfBoundsException(); 111 112 if (len == 0) 113 return; 114 115 sink.receive(b, off, len); 116 } 117 118 /** 119 * Flushes this output stream and forces any buffered output bytes to be written out. 120 * This will notify any readers that bytes are waiting in the pipe. 121 * 122 * @exception IOException if an I/O error occurs. 123 */ 124 @Override 125 public synchronized void flush() throws IOException { 126 if (null != sink) { 127 synchronized (sink) { 128 sink.notifyAll(); 129 } 130 } 131 } 132 133 /** 134 * Closes this piped output stream and releases any system resources associated with this stream. 135 * This stream may no longer be used for writing bytes. 136 * 137 * @exception IOException if an I/O error occurs. 138 */ 139 @Override 140 public void close() throws IOException { 141 if (null == sink) 142 return; 143 sink.receivedLast(); 144 } 145 146 147 } // public class PipedOutputStream