1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.archive.io;
24
25 import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
26
27 import java.io.File;
28 import java.io.FileOutputStream;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.OutputStream;
32 import java.text.DecimalFormat;
33 import java.text.NumberFormat;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.logging.Logger;
38 import java.util.zip.GZIPOutputStream;
39
40 import org.archive.util.ArchiveUtils;
41 import org.archive.util.IoUtils;
42 import org.archive.util.TimestampSerialno;
43
44
45 /***
46 * Member of {@link WriterPool}.
47 * Implements rotating off files, file naming with some guarantee of
48 * uniqueness, and position in file. Subclass to pick up functionality for a
49 * particular Writer type.
50 * @author stack
51 * @version $Date: 2010-06-19 19:33:12 +0000 (Sat, 19 Jun 2010) $ $Revision: 6900 $
52 */
53 public abstract class WriterPoolMember implements ArchiveFileConstants {
54 private final Logger logger = Logger.getLogger(this.getClass().getName());
55
56 public static final String UTF8 = "UTF-8";
57
58 /***
59 * Default file prefix.
60 *
61 * Stands for Internet Archive Heritrix.
62 */
63 public static final String DEFAULT_PREFIX = "IAH";
64
65 /***
66 * Value to interpolate with actual hostname.
67 */
68 public static final String HOSTNAME_VARIABLE = "${HOSTNAME}";
69
70 /***
71 * Value to interpolate with actual hostname-port.
72 */
73 public static final String HOSTNAME_ADMINPORT_VARIABLE = "${HOSTNAME_ADMINPORT}";
74
75 /***
76 * Default for file suffix.
77 */
78 public static final String DEFAULT_SUFFIX = HOSTNAME_VARIABLE;
79
80 /***
81 * Reference to file we're currently writing.
82 */
83 private File f = null;
84
85 /***
86 * Output stream for file.
87 */
88 private OutputStream out = null;
89
90 /***
91 * File output stream.
92 * This is needed so can get at channel to find current position in file.
93 */
94 private FileOutputStream fos;
95
96 private final boolean compressed;
97 private List<File> writeDirs = null;
98 private String prefix = DEFAULT_PREFIX;
99 private String suffix = DEFAULT_SUFFIX;
100 private final long maxSize;
101 private final String extension;
102
103 /***
104 * Creation date for the current file.
105 * Set by {@link #createFile()}.
106 */
107 private String createTimestamp = "UNSET!!!";
108
109 /***
110 * A running sequence used making unique file names.
111 */
112 final private AtomicInteger serialNo;
113
114 /***
115 * Directories round-robin index.
116 */
117 private static int roundRobinIndex = 0;
118
119 /***
120 * NumberFormat instance for formatting serial number.
121 *
122 * Pads serial number with zeros.
123 */
124 private static NumberFormat serialNoFormatter = new DecimalFormat("00000");
125
126
127 /***
128 * Buffer to reuse writing streams.
129 */
130 private final byte [] scratchbuffer = new byte[4 * 1024];
131
132
133 /***
134 * Constructor.
135 * Takes a stream. Use with caution. There is no upperbound check on size.
136 * Will just keep writing.
137 *
138 * @param serialNo used to create unique filename sequences
139 * @param out Where to write.
140 * @param file File the <code>out</code> is connected to.
141 * @param cmprs Compress the content written.
142 * @param a14DigitDate If null, we'll write current time.
143 * @throws IOException
144 */
145 protected WriterPoolMember(AtomicInteger serialNo,
146 final OutputStream out, final File file,
147 final boolean cmprs, String a14DigitDate)
148 throws IOException {
149 this(serialNo, null, null, cmprs, -1, null);
150 this.out = out;
151 this.f = file;
152 }
153
154 /***
155 * Constructor.
156 *
157 * @param serialNo used to create unique filename sequences
158 * @param dirs Where to drop files.
159 * @param prefix File prefix to use.
160 * @param cmprs Compress the records written.
161 * @param maxSize Maximum size for ARC files written.
162 * @param extension Extension to give file.
163 */
164 public WriterPoolMember(AtomicInteger serialNo,
165 final List<File> dirs, final String prefix,
166 final boolean cmprs, final long maxSize, final String extension) {
167 this(serialNo, dirs, prefix, "", cmprs, maxSize, extension);
168 }
169
170 /***
171 * Constructor.
172 *
173 * @param serialNo used to create unique filename sequences
174 * @param dirs Where to drop files.
175 * @param prefix File prefix to use.
176 * @param cmprs Compress the records written.
177 * @param maxSize Maximum size for ARC files written.
178 * @param suffix File tail to use. If null, unused.
179 * @param extension Extension to give file.
180 */
181 public WriterPoolMember(AtomicInteger serialNo,
182 final List<File> dirs, final String prefix,
183 final String suffix, final boolean cmprs,
184 final long maxSize, final String extension) {
185 this.suffix = suffix;
186 this.prefix = prefix;
187 this.maxSize = maxSize;
188 this.writeDirs = dirs;
189 this.compressed = cmprs;
190 this.extension = extension;
191 this.serialNo = serialNo;
192 }
193
194 /***
195 * Call this method just before/after any significant write.
196 *
197 * Call at the end of the writing of a record or just before we start
198 * writing a new record. Will close current file and open a new file
199 * if file size has passed out maxSize.
200 *
201 * <p>Creates and opens a file if none already open. One use of this method
202 * then is after construction, call this method to add the metadata, then
203 * call {@link #getPosition()} to find offset of first record.
204 *
205 * @exception IOException
206 */
207 public void checkSize() throws IOException {
208 if (this.out == null ||
209 (this.maxSize != -1 && (this.f.length() > this.maxSize))) {
210 createFile();
211 }
212 }
213
214 /***
215 * Create a new file.
216 * Rotates off the current Writer and creates a new in its place
217 * to take subsequent writes. Usually called from {@link #checkSize()}.
218 * @return Name of file created.
219 * @throws IOException
220 */
221 protected String createFile() throws IOException {
222 TimestampSerialno tsn = getTimestampSerialNo();
223 String name = this.prefix + '-' + getUniqueBasename(tsn) +
224 ((this.suffix == null || this.suffix.length() <= 0)?
225 "": "-" + this.suffix) + '.' + this.extension +
226 ((this.compressed)? '.' + COMPRESSED_FILE_EXTENSION: "") +
227 OCCUPIED_SUFFIX;
228 this.createTimestamp = tsn.getTimestamp();
229 File dir = getNextDirectory(this.writeDirs);
230 return createFile(new File(dir, name));
231 }
232
233 protected String createFile(final File file) throws IOException {
234 close();
235 this.f = file;
236 this.fos = new FileOutputStream(this.f);
237 this.out = new FastBufferedOutputStream(this.fos);
238 logger.info("Opened " + this.f.getAbsolutePath());
239 return this.f.getName();
240 }
241
242 /***
243 * @param dirs List of File objects that point at directories.
244 * @return Find next directory to write an arc too. If more
245 * than one, it tries to round-robin through each in turn.
246 * @throws IOException
247 */
248 protected File getNextDirectory(List<File> dirs)
249 throws IOException {
250 if (WriterPoolMember.roundRobinIndex >= dirs.size()) {
251 WriterPoolMember.roundRobinIndex = 0;
252 }
253 File d = null;
254 try {
255 d = checkWriteable((File)dirs.
256 get(WriterPoolMember.roundRobinIndex));
257 } catch (IndexOutOfBoundsException e) {
258
259
260 }
261 if (d == null && dirs.size() > 1) {
262 for (Iterator i = dirs.iterator(); d == null && i.hasNext();) {
263 d = checkWriteable((File)i.next());
264 }
265 } else {
266 WriterPoolMember.roundRobinIndex++;
267 }
268 if (d == null) {
269 throw new IOException("Directories unusable.");
270 }
271 return d;
272 }
273
274 protected File checkWriteable(File d) {
275 if (d == null) {
276 return d;
277 }
278
279 try {
280 IoUtils.ensureWriteableDirectory(d);
281 } catch(IOException e) {
282 logger.warning("Directory " + d.getPath() + " is not" +
283 " writeable or cannot be created: " + e.getMessage());
284 d = null;
285 }
286 return d;
287 }
288
289 protected synchronized TimestampSerialno getTimestampSerialNo() {
290 return getTimestampSerialNo(null);
291 }
292
293 /***
294 * Do static synchronization around getting of counter and timestamp so
295 * no chance of a thread getting in between the getting of timestamp and
296 * allocation of serial number throwing the two out of alignment.
297 *
298 * @param timestamp If non-null, use passed timestamp (must be 14 digit
299 * ARC format), else if null, timestamp with now.
300 * @return Instance of data structure that has timestamp and serial no.
301 */
302 protected synchronized TimestampSerialno
303 getTimestampSerialNo(final String timestamp) {
304 return new TimestampSerialno((timestamp != null)?
305 timestamp: ArchiveUtils.get14DigitDate(),
306 serialNo.getAndIncrement());
307 }
308
309 /***
310 * Return a unique basename.
311 *
312 * Name is timestamp + an every increasing sequence number.
313 *
314 * @param tsn Structure with timestamp and serial number.
315 *
316 * @return Unique basename.
317 */
318 private String getUniqueBasename(TimestampSerialno tsn) {
319 return tsn.getTimestamp() + "-" +
320 WriterPoolMember.serialNoFormatter.format(tsn.getSerialNumber());
321 }
322
323
324 /***
325 * Get the file name
326 *
327 * @return the filename, as if uncompressed
328 */
329 protected String getBaseFilename() {
330 String name = this.f.getName();
331 if (this.compressed && name.endsWith(DOT_COMPRESSED_FILE_EXTENSION)) {
332 return name.substring(0,name.length() - 3);
333 } else if(this.compressed &&
334 name.endsWith(DOT_COMPRESSED_FILE_EXTENSION +
335 OCCUPIED_SUFFIX)) {
336 return name.substring(0, name.length() -
337 (3 + OCCUPIED_SUFFIX.length()));
338 } else {
339 return name;
340 }
341 }
342
343 /***
344 * Get this file.
345 *
346 * Used by junit test to test for creation and when {@link WriterPool} wants
347 * to invalidate a file.
348 *
349 * @return The current file.
350 */
351 public File getFile() {
352 return this.f;
353 }
354
355 /***
356 * Post write tasks.
357 *
358 * Has side effects. Will open new file if we're at the upperbound.
359 * If we're writing compressed files, it will wrap output stream with a
360 * GZIP writer with side effect that GZIP header is written out on the
361 * stream.
362 *
363 * @exception IOException
364 */
365 protected void preWriteRecordTasks()
366 throws IOException {
367 checkSize();
368 if (this.compressed) {
369
370
371
372 this.out = new CompressedStream(this.out);
373 }
374 }
375
376 /***
377 * Post file write tasks.
378 * If compressed, finishes up compression and flushes stream so any
379 * subsequent checks get good reading.
380 *
381 * @exception IOException
382 */
383 protected void postWriteRecordTasks()
384 throws IOException {
385 if (this.compressed) {
386 CompressedStream o = (CompressedStream)this.out;
387 o.finish();
388 o.flush();
389 o.end();
390 this.out = o.getWrappedStream();
391 }
392 }
393
394 /***
395 * Postion in current physical file.
396 * Used making accounting of bytes written.
397 * @return Position in underlying file. Call before or after writing
398 * records *only* to be safe.
399 * @throws IOException
400 */
401 public long getPosition() throws IOException {
402 long position = 0;
403 if (this.out != null) {
404 this.out.flush();
405 }
406 if (this.fos != null) {
407
408
409 this.fos.flush();
410 position = this.fos.getChannel().position();
411 }
412 return position;
413 }
414
415 public boolean isCompressed() {
416 return compressed;
417 }
418
419 /***
420 * @return number of bytes written, which is always {@code b.length}
421 */
422 protected int write(final byte [] b) throws IOException {
423 this.out.write(b);
424 return b.length;
425 }
426
427 protected void flush() throws IOException {
428 this.out.flush();
429 }
430
431 /***
432 * @return
433 * @return number of bytes written, which is always {@code len}
434 */
435 protected int write(byte[] b, int off, int len) throws IOException {
436 this.out.write(b, off, len);
437 return len;
438 }
439
440 /***
441 * @return
442 * @return number of bytes written, which is always {@code 1}
443 */
444 protected int write(int b) throws IOException {
445 this.out.write(b);
446 return 1;
447 }
448
449 /***
450 * @deprecated Use {@link #copyFrom(InputStream,long,boolean)} instead
451 */
452 protected void readFullyFrom(final InputStream is, final long recordLength,
453 final byte [] b)
454 throws IOException {
455 copyFrom(is, recordLength, true);
456 }
457
458 /***
459 * @deprecated Use {@link #copyFrom(InputStream,long,boolean)} instead
460 */
461 protected void readToLimitFrom(final InputStream is, final long limit,
462 final byte [] b)
463 throws IOException {
464 copyFrom(is, limit, true);
465 }
466
467 /***
468 * Copy bytes from the provided InputStream to the target file/stream being
469 * written.
470 *
471 * @param is
472 * InputStream to copy bytes from
473 * @param recordLength
474 * expected number of bytes to copy
475 * @param enforceLength
476 * whether to throw an exception if too many/too few bytes are
477 * available from stream
478 * @return number of bytes written (normally equal to {@code enforceLength})
479 * @throws IOException
480 */
481 protected long copyFrom(final InputStream is, final long recordLength,
482 boolean enforceLength) throws IOException {
483 int read = scratchbuffer.length;
484 long tot = 0;
485 while ((tot < recordLength)
486 && (read = is.read(scratchbuffer)) != -1) {
487 int write = read;
488
489 write = (int) Math.min(write, recordLength - tot);
490 tot += read;
491 write(scratchbuffer, 0, write);
492 }
493 if (enforceLength && tot != recordLength) {
494
495 throw new IOException("Read " + tot + " but expected "
496 + recordLength);
497 }
498
499 return tot;
500 }
501
502 public void close() throws IOException {
503 if (this.out == null) {
504 return;
505 }
506 this.out.close();
507 this.out = null;
508 this.fos = null;
509 if (this.f != null && this.f.exists()) {
510 String path = this.f.getAbsolutePath();
511 if (path.endsWith(OCCUPIED_SUFFIX)) {
512 File f = new File(path.substring(0,
513 path.length() - OCCUPIED_SUFFIX.length()));
514 if (!this.f.renameTo(f)) {
515 logger.warning("Failed rename of " + path);
516 }
517 this.f = f;
518 }
519
520 logger.info("Closed " + this.f.getAbsolutePath() +
521 ", size " + this.f.length());
522 }
523 }
524
525 protected OutputStream getOutputStream() {
526 return this.out;
527 }
528
529 protected String getCreateTimestamp() {
530 return createTimestamp;
531 }
532
533
534 /***
535 * An override so we get access to underlying output stream
536 * and offer an end() that does not accompany closing underlying
537 * stream.
538 * @author stack
539 */
540 private class CompressedStream extends GZIPOutputStream {
541 public CompressedStream(OutputStream out)
542 throws IOException {
543 super(out);
544 }
545
546 /***
547 * @return Reference to stream being compressed.
548 */
549 OutputStream getWrappedStream() {
550 return this.out;
551 }
552
553 /***
554 * Release the deflater's native process resources,
555 * which otherwise would not occur until either
556 * finalization or DeflaterOutputStream.close()
557 * (which would also close underlying stream).
558 */
559 public void end() {
560 def.end();
561 }
562
563
564 }
565 }