View Javadoc

1   /* $Id: WriterPoolMember.java 6900 2010-06-19 19:33:12Z nlevitt $
2    *
3    * Created on July 21st, 2006
4    *
5    * Copyright (C) 2006 Internet Archive.
6    *
7    * This file is part of the Heritrix web crawler (crawler.archive.org).
8    *
9    * Heritrix is free software; you can redistribute it and/or modify
10   * it under the terms of the GNU Lesser Public License as published by
11   * the Free Software Foundation; either version 2.1 of the License, or
12   * any later version.
13   *
14   * Heritrix is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   * GNU Lesser Public License for more details.
18   *
19   * You should have received a copy of the GNU Lesser Public License
20   * along with Heritrix; if not, write to the Free Software
21   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22   */
23  package org.archive.io;
24  
25  import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
26  
27  import java.io.File;
28  import java.io.FileOutputStream;
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.io.OutputStream;
32  import java.text.DecimalFormat;
33  import java.text.NumberFormat;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.concurrent.atomic.AtomicInteger;
37  import java.util.logging.Logger;
38  import java.util.zip.GZIPOutputStream;
39  
40  import org.archive.util.ArchiveUtils;
41  import org.archive.util.IoUtils;
42  import org.archive.util.TimestampSerialno;
43  
44  
45  /***
46   * Member of {@link WriterPool}.
47   * Implements rotating off files, file naming with some guarantee of
48   * uniqueness, and position in file. Subclass to pick up functionality for a
49   * particular Writer type.
50   * @author stack
51   * @version $Date: 2010-06-19 19:33:12 +0000 (Sat, 19 Jun 2010) $ $Revision: 6900 $
52   */
53  public abstract class WriterPoolMember implements ArchiveFileConstants {
54      private final Logger logger = Logger.getLogger(this.getClass().getName());
55      
56      public static final String UTF8 = "UTF-8";
57      
58      /***
59       * Default file prefix.
60       * 
61       * Stands for Internet Archive Heritrix.
62       */
63      public static final String DEFAULT_PREFIX = "IAH";
64      
65      /***
66       * Value to interpolate with actual hostname.
67       */
68      public static final String HOSTNAME_VARIABLE = "${HOSTNAME}";
69      
70      /***
71       * Value to interpolate with actual hostname-port.
72       */
73      public static final String HOSTNAME_ADMINPORT_VARIABLE = "${HOSTNAME_ADMINPORT}";
74      
75      /***
76       * Default for file suffix.
77       */
78      public static final String DEFAULT_SUFFIX = HOSTNAME_VARIABLE;
79  
80      /***
81       * Reference to file we're currently writing.
82       */
83      private File f = null;
84  
85      /***
86       *  Output stream for file.
87       */
88      private OutputStream out = null;
89      
90      /***
91       * File output stream.
92       * This is needed so can get at channel to find current position in file.
93       */
94      private FileOutputStream fos;
95      
96      private final boolean compressed;
97      private List<File> writeDirs = null;
98      private String prefix = DEFAULT_PREFIX;
99      private String suffix = DEFAULT_SUFFIX;
100     private final long maxSize;
101     private final String extension;
102 
103     /***
104      * Creation date for the current file.
105      * Set by {@link #createFile()}.
106      */
107 	private String createTimestamp = "UNSET!!!";
108     
109     /***
110      * A running sequence used making unique file names.
111      */
112     final private AtomicInteger serialNo;
113     
114     /***
115      * Directories round-robin index.
116      */
117     private static int roundRobinIndex = 0;
118 
119     /***
120      * NumberFormat instance for formatting serial number.
121      *
122      * Pads serial number with zeros.
123      */
124     private static NumberFormat serialNoFormatter = new DecimalFormat("00000");
125     
126     
127     /***
128      * Buffer to reuse writing streams.
129      */
130     private final byte [] scratchbuffer = new byte[4 * 1024];
131  
132     
133     /***
134      * Constructor.
135      * Takes a stream. Use with caution. There is no upperbound check on size.
136      * Will just keep writing.
137      * 
138      * @param serialNo  used to create unique filename sequences
139      * @param out Where to write.
140      * @param file File the <code>out</code> is connected to.
141      * @param cmprs Compress the content written.
142      * @param a14DigitDate If null, we'll write current time.
143      * @throws IOException
144      */
145     protected WriterPoolMember(AtomicInteger serialNo, 
146             final OutputStream out, final File file,
147             final boolean cmprs, String a14DigitDate)
148     throws IOException {
149         this(serialNo, null, null, cmprs, -1, null);
150         this.out = out;
151         this.f = file;
152     }
153     
154     /***
155      * Constructor.
156      *
157      * @param serialNo  used to create unique filename sequences
158      * @param dirs Where to drop files.
159      * @param prefix File prefix to use.
160      * @param cmprs Compress the records written. 
161      * @param maxSize Maximum size for ARC files written.
162      * @param extension Extension to give file.
163      */
164     public WriterPoolMember(AtomicInteger serialNo, 
165             final List<File> dirs, final String prefix, 
166             final boolean cmprs, final long maxSize, final String extension) {
167         this(serialNo, dirs, prefix, "", cmprs, maxSize, extension);
168     }
169             
170     /***
171      * Constructor.
172      *
173      * @param serialNo  used to create unique filename sequences
174      * @param dirs Where to drop files.
175      * @param prefix File prefix to use.
176      * @param cmprs Compress the records written. 
177      * @param maxSize Maximum size for ARC files written.
178      * @param suffix File tail to use.  If null, unused.
179      * @param extension Extension to give file.
180      */
181     public WriterPoolMember(AtomicInteger serialNo,
182             final List<File> dirs, final String prefix, 
183             final String suffix, final boolean cmprs,
184             final long maxSize, final String extension) {
185         this.suffix = suffix;
186         this.prefix = prefix;
187         this.maxSize = maxSize;
188         this.writeDirs = dirs;
189         this.compressed = cmprs;
190         this.extension = extension;
191         this.serialNo = serialNo;
192     }
193 
194 	/***
195 	 * Call this method just before/after any significant write.
196 	 *
197 	 * Call at the end of the writing of a record or just before we start
198 	 * writing a new record.  Will close current file and open a new file
199 	 * if file size has passed out maxSize.
200 	 * 
201 	 * <p>Creates and opens a file if none already open.  One use of this method
202 	 * then is after construction, call this method to add the metadata, then
203 	 * call {@link #getPosition()} to find offset of first record.
204 	 *
205 	 * @exception IOException
206 	 */
207     public void checkSize() throws IOException {
208         if (this.out == null ||
209                 (this.maxSize != -1 && (this.f.length() > this.maxSize))) {
210             createFile();
211         }
212     }
213 
214     /***
215      * Create a new file.
216      * Rotates off the current Writer and creates a new in its place
217      * to take subsequent writes.  Usually called from {@link #checkSize()}.
218      * @return Name of file created.
219      * @throws IOException
220      */
221     protected String createFile() throws IOException {
222         TimestampSerialno tsn = getTimestampSerialNo();
223         String name = this.prefix + '-' + getUniqueBasename(tsn) +
224             ((this.suffix == null || this.suffix.length() <= 0)?
225                 "": "-" + this.suffix) + '.' + this.extension  +
226             ((this.compressed)? '.' + COMPRESSED_FILE_EXTENSION: "") +
227             OCCUPIED_SUFFIX;
228         this.createTimestamp = tsn.getTimestamp();
229         File dir = getNextDirectory(this.writeDirs);
230         return createFile(new File(dir, name));
231     }
232     
233     protected String createFile(final File file) throws IOException {
234     	close();
235         this.f = file;
236         this.fos = new FileOutputStream(this.f);
237         this.out = new FastBufferedOutputStream(this.fos);
238         logger.info("Opened " + this.f.getAbsolutePath());
239         return this.f.getName();
240     }
241     
242     /***
243      * @param dirs List of File objects that point at directories.
244      * @return Find next directory to write an arc too.  If more
245      * than one, it tries to round-robin through each in turn.
246      * @throws IOException
247      */
248     protected File getNextDirectory(List<File> dirs)
249     throws IOException {
250         if (WriterPoolMember.roundRobinIndex >= dirs.size()) {
251             WriterPoolMember.roundRobinIndex = 0;
252         }
253         File d = null;
254         try {
255             d = checkWriteable((File)dirs.
256                 get(WriterPoolMember.roundRobinIndex));
257         } catch (IndexOutOfBoundsException e) {
258             // Dirs list might be altered underneath us.
259             // If so, we get this exception -- just keep on going.
260         }
261         if (d == null && dirs.size() > 1) {
262             for (Iterator i = dirs.iterator(); d == null && i.hasNext();) {
263                 d = checkWriteable((File)i.next());
264             }
265         } else {
266             WriterPoolMember.roundRobinIndex++;
267         }
268         if (d == null) {
269             throw new IOException("Directories unusable.");
270         }
271         return d;
272     }
273         
274     protected File checkWriteable(File d) {
275         if (d == null) {
276             return d;
277         }
278         
279         try {
280             IoUtils.ensureWriteableDirectory(d);
281         } catch(IOException e) {
282             logger.warning("Directory " + d.getPath() + " is not" +
283                 " writeable or cannot be created: " + e.getMessage());
284             d = null;
285         }
286         return d;
287     }
288     
289     protected synchronized TimestampSerialno getTimestampSerialNo() {
290         return getTimestampSerialNo(null);
291     }
292     
293     /***
294      * Do static synchronization around getting of counter and timestamp so
295      * no chance of a thread getting in between the getting of timestamp and
296      * allocation of serial number throwing the two out of alignment.
297      * 
298      * @param timestamp If non-null, use passed timestamp (must be 14 digit
299      * ARC format), else if null, timestamp with now.
300      * @return Instance of data structure that has timestamp and serial no.
301      */
302     protected synchronized TimestampSerialno
303             getTimestampSerialNo(final String timestamp) {
304         return new TimestampSerialno((timestamp != null)?
305                 timestamp: ArchiveUtils.get14DigitDate(),
306                 serialNo.getAndIncrement());
307     }
308 
309     /***
310      * Return a unique basename.
311      *
312      * Name is timestamp + an every increasing sequence number.
313      *
314      * @param tsn Structure with timestamp and serial number.
315      *
316      * @return Unique basename.
317      */
318     private String getUniqueBasename(TimestampSerialno tsn) {
319         return tsn.getTimestamp() + "-" +
320            WriterPoolMember.serialNoFormatter.format(tsn.getSerialNumber());
321     }
322 
323 
324     /***
325      * Get the file name
326      * 
327      * @return the filename, as if uncompressed
328      */
329     protected String getBaseFilename() {
330         String name = this.f.getName();
331         if (this.compressed && name.endsWith(DOT_COMPRESSED_FILE_EXTENSION)) {
332             return name.substring(0,name.length() - 3);
333         } else if(this.compressed &&
334                 name.endsWith(DOT_COMPRESSED_FILE_EXTENSION +
335                     OCCUPIED_SUFFIX)) {
336             return name.substring(0, name.length() -
337                 (3 + OCCUPIED_SUFFIX.length()));
338         } else {
339             return name;
340         }
341     }
342 
343 	/***
344 	 * Get this file.
345 	 *
346 	 * Used by junit test to test for creation and when {@link WriterPool} wants
347      * to invalidate a file.
348 	 *
349 	 * @return The current file.
350 	 */
351     public File getFile() {
352         return this.f;
353     }
354 
355     /***
356      * Post write tasks.
357      * 
358      * Has side effects.  Will open new file if we're at the upperbound.
359      * If we're writing compressed files, it will wrap output stream with a
360      * GZIP writer with side effect that GZIP header is written out on the
361      * stream.
362      *
363      * @exception IOException
364      */
365     protected void preWriteRecordTasks()
366     throws IOException {
367         checkSize();
368         if (this.compressed) {
369             // Wrap stream in GZIP Writer.
370             // The below construction immediately writes the GZIP 'default'
371             // header out on the underlying stream.
372             this.out = new CompressedStream(this.out);
373         }
374     }
375 
376     /***
377      * Post file write tasks.
378      * If compressed, finishes up compression and flushes stream so any
379      * subsequent checks get good reading.
380      *
381      * @exception IOException
382      */
383     protected void postWriteRecordTasks()
384     throws IOException {
385         if (this.compressed) {
386             CompressedStream o = (CompressedStream)this.out;
387             o.finish();
388             o.flush();
389             o.end();
390             this.out = o.getWrappedStream();
391         }
392     }
393     
394 	/***
395      * Postion in current physical file.
396      * Used making accounting of bytes written.
397 	 * @return Position in underlying file.  Call before or after writing
398      * records *only* to be safe.
399 	 * @throws IOException
400 	 */
401     public long getPosition() throws IOException {
402         long position = 0;
403         if (this.out != null) {
404             this.out.flush();
405         }
406         if (this.fos != null) {
407             // Call flush on underlying file though probably not needed assuming
408             // above this.out.flush called through to this.fos.
409             this.fos.flush();
410             position = this.fos.getChannel().position();
411         }
412         return position;
413     }
414 
415     public boolean isCompressed() {
416         return compressed;
417     }
418     
419     /***
420      * @return number of bytes written, which is always {@code b.length}
421      */
422     protected int write(final byte [] b) throws IOException {
423     	this.out.write(b);
424     	return b.length;
425     }
426     
427 	protected void flush() throws IOException {
428 		this.out.flush();
429 	}
430 
431     /***
432      * @return 
433      * @return number of bytes written, which is always {@code len}
434      */
435 	protected int write(byte[] b, int off, int len) throws IOException {
436 		this.out.write(b, off, len);
437 		return len;
438 	}
439 
440     /***
441      * @return 
442      * @return number of bytes written, which is always {@code 1}
443      */
444 	protected int write(int b) throws IOException {
445 		this.out.write(b);
446 		return 1;
447 	}
448 	
449 	/***
450      * @deprecated Use {@link #copyFrom(InputStream,long,boolean)} instead
451      */
452     protected void readFullyFrom(final InputStream is, final long recordLength,
453     		final byte [] b)
454     throws IOException {
455         copyFrom(is, recordLength, true);
456     }
457 
458     /***
459      * @deprecated Use {@link #copyFrom(InputStream,long,boolean)} instead
460      */
461 	protected void readToLimitFrom(final InputStream is, final long limit,
462 			final byte [] b)
463 	throws IOException {
464         copyFrom(is, limit, true);
465 	}
466 
467     /***
468      * Copy bytes from the provided InputStream to the target file/stream being
469      * written.
470      * 
471      * @param is
472      *            InputStream to copy bytes from
473      * @param recordLength
474      *            expected number of bytes to copy
475      * @param enforceLength
476      *            whether to throw an exception if too many/too few bytes are
477      *            available from stream
478      * @return number of bytes written (normally equal to {@code enforceLength})
479      * @throws IOException
480      */
481     protected long copyFrom(final InputStream is, final long recordLength,
482             boolean enforceLength) throws IOException {
483         int read = scratchbuffer.length;
484         long tot = 0;
485         while ((tot < recordLength)
486                 && (read = is.read(scratchbuffer)) != -1) {
487             int write = read; 
488             // never write more than declared length
489             write = (int) Math.min(write, recordLength - tot);
490             tot += read;
491             write(scratchbuffer, 0, write);
492         }
493         if (enforceLength && tot != recordLength) {
494             // throw exception if desired for read vs. declared mismatches
495             throw new IOException("Read " + tot + " but expected "
496                     + recordLength);
497         }
498         
499         return tot;
500     }
501 
502     public void close() throws IOException {
503         if (this.out == null) {
504             return;
505         }
506         this.out.close();
507         this.out = null;
508         this.fos = null;
509         if (this.f != null && this.f.exists()) {
510             String path = this.f.getAbsolutePath();
511             if (path.endsWith(OCCUPIED_SUFFIX)) {
512                 File f = new File(path.substring(0,
513                         path.length() - OCCUPIED_SUFFIX.length()));
514                 if (!this.f.renameTo(f)) {
515                     logger.warning("Failed rename of " + path);
516                 }
517                 this.f = f;
518             }
519             
520             logger.info("Closed " + this.f.getAbsolutePath() +
521                     ", size " + this.f.length());
522         }
523     }
524     
525     protected OutputStream getOutputStream() {
526     	return this.out;
527     }
528     
529 	protected String getCreateTimestamp() {
530 		return createTimestamp;
531 	}
532     
533     
534     /***
535      * An override so we get access to underlying output stream
536      * and offer an end() that does not accompany closing underlying
537      * stream. 
538      * @author stack
539      */
540     private class CompressedStream extends GZIPOutputStream {
541         public CompressedStream(OutputStream out)
542         throws IOException {
543             super(out);
544         }
545         
546         /***
547          * @return Reference to stream being compressed.
548          */
549         OutputStream getWrappedStream() {
550             return this.out;
551         }
552 
553         /***
554          * Release the deflater's native process resources,
555          * which otherwise would not occur until either
556          * finalization or DeflaterOutputStream.close() 
557          * (which would also close underlying stream). 
558          */
559         public void end() {
560             def.end();
561         }
562         
563         
564     }
565 }