001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import static org.apache.commons.io.IOUtils.EOF; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.List; 026 027import org.apache.commons.io.IOUtils; 028 029/** 030 * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the 031 * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}. 032 * <p> 033 * A typical application may be the generation of a {@link java.security.MessageDigest} on the fly. 034 * </p> 035 * <p> 036 * <em>Note</em>: The {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually 037 * aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must 038 * be used. 039 * </p> 040 * 041 * @see MessageDigestCalculatingInputStream 042 */ 043public class ObservableInputStream extends ProxyInputStream { 044 045 /** 046 * Abstracts observer callback for {@code ObservableInputStream}s. 047 */ 048 public static abstract class Observer { 049 050 /** 051 * Called to indicate that the {@link ObservableInputStream} has been closed. 052 * 053 * @throws IOException if an I/O error occurs. 054 */ 055 @SuppressWarnings("unused") // Possibly thrown from subclasses. 056 public void closed() throws IOException { 057 // noop 058 } 059 060 /** 061 * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have 062 * been called, and are about to invoke data. 063 * 064 * @param buffer The byte array, which has been passed to the read call, and where data has been stored. 065 * @param offset The offset within the byte array, where data has been stored. 066 * @param length The number of bytes, which have been stored in the byte array. 067 * @throws IOException if an I/O error occurs. 068 */ 069 @SuppressWarnings("unused") // Possibly thrown from subclasses. 070 public void data(final byte[] buffer, final int offset, final int length) throws IOException { 071 // noop 072 } 073 074 /** 075 * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream}, 076 * and will return a value. 077 * 078 * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case, 079 * {@link #finished()} will be invoked instead. 080 * @throws IOException if an I/O error occurs. 081 */ 082 @SuppressWarnings("unused") // Possibly thrown from subclasses. 083 public void data(final int value) throws IOException { 084 // noop 085 } 086 087 /** 088 * Called to indicate that an error occurred on the underlying stream. 089 * 090 * @param exception the exception to throw 091 * @throws IOException if an I/O error occurs. 092 */ 093 public void error(final IOException exception) throws IOException { 094 throw exception; 095 } 096 097 /** 098 * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times, 099 * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF. 100 * 101 * @throws IOException if an I/O error occurs. 102 */ 103 @SuppressWarnings("unused") // Possibly thrown from subclasses. 104 public void finished() throws IOException { 105 // noop 106 } 107 } 108 109 private final List<Observer> observers; 110 111 /** 112 * Creates a new ObservableInputStream for the given InputStream. 113 * 114 * @param inputStream the input stream to observe. 115 */ 116 public ObservableInputStream(final InputStream inputStream) { 117 this(inputStream, new ArrayList<>()); 118 } 119 120 /** 121 * Creates a new ObservableInputStream for the given InputStream. 122 * 123 * @param inputStream the input stream to observe. 124 * @param observers List of observer callbacks. 125 */ 126 private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) { 127 super(inputStream); 128 this.observers = observers; 129 } 130 131 /** 132 * Creates a new ObservableInputStream for the given InputStream. 133 * 134 * @param inputStream the input stream to observe. 135 * @param observers List of observer callbacks. 136 * @since 2.9.0 137 */ 138 public ObservableInputStream(final InputStream inputStream, final Observer... observers) { 139 this(inputStream, Arrays.asList(observers)); 140 } 141 142 /** 143 * Adds an Observer. 144 * 145 * @param observer the observer to add. 146 */ 147 public void add(final Observer observer) { 148 observers.add(observer); 149 } 150 151 @Override 152 public void close() throws IOException { 153 IOException ioe = null; 154 try { 155 super.close(); 156 } catch (final IOException e) { 157 ioe = e; 158 } 159 if (ioe == null) { 160 noteClosed(); 161 } else { 162 noteError(ioe); 163 } 164 } 165 166 /** 167 * Reads all data from the underlying {@link InputStream}, while notifying the observers. 168 * 169 * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception. 170 */ 171 public void consume() throws IOException { 172 final byte[] buffer = IOUtils.byteArray(); 173 while (read(buffer) != EOF) { 174 // empty 175 } 176 } 177 178 /** 179 * Gets all currently registered observers. 180 * 181 * @return a list of the currently registered observers. 182 * @since 2.9.0 183 */ 184 public List<Observer> getObservers() { 185 return observers; 186 } 187 188 /** 189 * Notifies the observers by invoking {@link Observer#finished()}. 190 * 191 * @throws IOException Some observer has thrown an exception, which is being passed down. 192 */ 193 protected void noteClosed() throws IOException { 194 for (final Observer observer : getObservers()) { 195 observer.closed(); 196 } 197 } 198 199 /** 200 * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments. 201 * 202 * @param value Passed to the observers. 203 * @throws IOException Some observer has thrown an exception, which is being passed down. 204 */ 205 protected void noteDataByte(final int value) throws IOException { 206 for (final Observer observer : getObservers()) { 207 observer.data(value); 208 } 209 } 210 211 /** 212 * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments. 213 * 214 * @param buffer Passed to the observers. 215 * @param offset Passed to the observers. 216 * @param length Passed to the observers. 217 * @throws IOException Some observer has thrown an exception, which is being passed down. 218 */ 219 protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException { 220 for (final Observer observer : getObservers()) { 221 observer.data(buffer, offset, length); 222 } 223 } 224 225 /** 226 * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument. 227 * 228 * @param exception Passed to the observers. 229 * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same 230 * exception, which has been passed as an argument. 231 */ 232 protected void noteError(final IOException exception) throws IOException { 233 for (final Observer observer : getObservers()) { 234 observer.error(exception); 235 } 236 } 237 238 /** 239 * Notifies the observers by invoking {@link Observer#finished()}. 240 * 241 * @throws IOException Some observer has thrown an exception, which is being passed down. 242 */ 243 protected void noteFinished() throws IOException { 244 for (final Observer observer : getObservers()) { 245 observer.finished(); 246 } 247 } 248 249 private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException { 250 if (ioe != null) { 251 noteError(ioe); 252 throw ioe; 253 } 254 if (result == EOF) { 255 noteFinished(); 256 } else if (result > 0) { 257 noteDataBytes(buffer, offset, result); 258 } 259 } 260 261 @Override 262 public int read() throws IOException { 263 int result = 0; 264 IOException ioe = null; 265 try { 266 result = super.read(); 267 } catch (final IOException ex) { 268 ioe = ex; 269 } 270 if (ioe != null) { 271 noteError(ioe); 272 throw ioe; 273 } 274 if (result == EOF) { 275 noteFinished(); 276 } else { 277 noteDataByte(result); 278 } 279 return result; 280 } 281 282 @Override 283 public int read(final byte[] buffer) throws IOException { 284 int result = 0; 285 IOException ioe = null; 286 try { 287 result = super.read(buffer); 288 } catch (final IOException ex) { 289 ioe = ex; 290 } 291 notify(buffer, 0, result, ioe); 292 return result; 293 } 294 295 @Override 296 public int read(final byte[] buffer, final int offset, final int length) throws IOException { 297 int result = 0; 298 IOException ioe = null; 299 try { 300 result = super.read(buffer, offset, length); 301 } catch (final IOException ex) { 302 ioe = ex; 303 } 304 notify(buffer, offset, result, ioe); 305 return result; 306 } 307 308 /** 309 * Removes an Observer. 310 * 311 * @param observer the observer to remove 312 */ 313 public void remove(final Observer observer) { 314 observers.remove(observer); 315 } 316 317 /** 318 * Removes all Observers. 319 */ 320 public void removeAllObservers() { 321 observers.clear(); 322 } 323 324}