View Javadoc

1   /*  $Id: ExperimentalWARCWriter.java 4604 2006-09-06 05:38:18Z stack-sf $
2    *
3    * Created on July 27th, 2006
4    *
5    * Copyright (C) 2006 Internet Archive.
6    *
7    * This file is part of the Heritrix web crawler (crawler.archive.org).
8    *
9    * Heritrix is free software; you can redistribute it and/or modify
10   * it under the terms of the GNU Lesser Public License as published by
11   * the Free Software Foundation; either version 2.1 of the License, or
12   * any later version.
13   *
14   * Heritrix is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   * GNU Lesser Public License for more details.
18   *
19   * You should have received a copy of the GNU Lesser Public License
20   * along with Heritrix; if not, write to the Free Software
21   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22   */
23  package org.archive.io.warc;
24  
25  import java.io.ByteArrayInputStream;
26  import java.io.File;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.io.OutputStream;
30  import java.net.URI;
31  import java.net.URISyntaxException;
32  import java.util.HashMap;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.concurrent.atomic.AtomicInteger;
37  import java.util.logging.Level;
38  import java.util.logging.Logger;
39  
40  import org.archive.io.WriterPoolMember;
41  import org.archive.uid.GeneratorFactory;
42  import org.archive.util.ArchiveUtils;
43  import org.archive.util.anvl.ANVLRecord;
44  
45  
46  /***
47   * WARC implementation.
48   *
49   * <p>Assumption is that the caller is managing access to this
50   * WARCWriter ensuring only one thread accessing this WARC instance
51   * at any one time.
52   * 
53   * <p>While being written, WARCs have a '.open' suffix appended.
54   *
55   * @author stack
56   * @version $Revision: 4604 $ $Date: 2006-09-05 22:38:18 -0700 (Tue, 05 Sep 2006) $
57   */
58  public class WARCWriter extends WriterPoolMember implements WARCConstants {
59      public static final String TOTALS = "totals";
60      public static final String SIZE_ON_DISK = "sizeOnDisk";
61      public static final String TOTAL_BYTES = "totalBytes";
62      public static final String CONTENT_BYTES = "contentBytes";
63      public static final String NUM_RECORDS = "numRecords";
64  
65      private static final Logger logger = 
66          Logger.getLogger(WARCWriter.class.getName());
67  
68      /***
69       * NEWLINE as bytes.
70       */
71      public static byte [] CRLF_BYTES;
72      static {
73          try {
74              CRLF_BYTES = CRLF.getBytes(DEFAULT_ENCODING);
75          } catch(Exception e) {
76              e.printStackTrace();
77          }
78      };
79      
80      /***
81       * Metadata.
82       */
83      private final List<String> fileMetadata;
84      
85      private Map<String,Map<String,Long>> stats; 
86      
87      /***
88       * Shutdown Constructor
89       * Has default access so can make instance to test utility methods.
90       */
91      WARCWriter() {
92          this(null, null, "", "", true, -1, null);
93      }
94      
95      /***
96       * Constructor.
97       * Takes a stream. Use with caution. There is no upperbound check on size.
98       * Will just keep writing.  Only pass Streams that are bounded. 
99       * @param serialNo  used to generate unique file name sequences
100      * @param out Where to write.
101      * @param f File the <code>out</code> is connected to.
102      * @param cmprs Compress the content written.
103      * @param a14DigitDate If null, we'll write current time.
104      * @throws IOException
105      */
106     public WARCWriter(final AtomicInteger serialNo,
107     		final OutputStream out, final File f,
108     		final boolean cmprs, final String a14DigitDate,
109             final List<String> warcinfoData)
110     throws IOException {
111         super(serialNo, out, f, cmprs, a14DigitDate);
112         this.fileMetadata = warcinfoData;
113     }
114             
115     /***
116      * Constructor.
117      *
118      * @param dirs Where to drop files.
119      * @param prefix File prefix to use.
120      * @param cmprs Compress the records written. 
121      * @param maxSize Maximum size for ARC files written.
122      * @param suffix File tail to use.  If null, unused.
123      * @param warcinfoData File metadata for warcinfo record.
124      */
125     public WARCWriter(final AtomicInteger serialNo,
126     		final List<File> dirs, final String prefix, 
127             final String suffix, final boolean cmprs,
128             final long maxSize, final List<String> warcinfoData) {
129         super(serialNo, dirs, prefix, suffix, cmprs, maxSize,
130         	WARC_FILE_EXTENSION);
131         this.fileMetadata = warcinfoData;
132     }
133     
134     @Override
135     protected String createFile(File file) throws IOException {
136     	String filename = super.createFile(file);
137     	writeWarcinfoRecord(filename);
138         return filename;
139     }
140     
141     protected void baseCharacterCheck(final char c, final String parameter)
142     throws IllegalArgumentException {
143         // TODO: Too strict?  UNICODE control characters?
144         if (Character.isISOControl(c) || !Character.isValidCodePoint(c)) {
145             throw new IllegalArgumentException("Contains illegal character 0x" +
146                 Integer.toHexString(c) + ": " + parameter);
147         }
148     }
149     
150     protected String checkHeaderValue(final String value)
151     throws IllegalArgumentException {
152         for (int i = 0; i < value.length(); i++) {
153         	final char c = value.charAt(i);
154         	baseCharacterCheck(c, value);
155         	if (Character.isWhitespace(c)) {
156                 throw new IllegalArgumentException("Contains disallowed white space 0x" +
157                     Integer.toHexString(c) + ": " + value);
158         	}
159         }
160         return value;
161     }
162     
163     protected String checkHeaderLineMimetypeParameter(final String parameter)
164     throws IOException {
165     	StringBuilder sb = new StringBuilder(parameter.length());
166     	boolean wasWhitespace = false;
167         for (int i = 0; i < parameter.length(); i++) {
168         	char c = parameter.charAt(i);
169         	if (Character.isWhitespace(c)) {
170         		// Map all to ' ' and collapse multiples into one.
171         		// TODO: Make sure white space occurs in legal location --
172         		// before parameter or inside quoted-string.
173         		if (wasWhitespace) {
174         			continue;
175         		}
176         		wasWhitespace = true;
177         		c = ' ';
178         	} else {
179         		wasWhitespace = false;
180         		baseCharacterCheck(c, parameter);
181         	}
182         	sb.append(c);
183         }
184         
185         return sb.toString();
186     }
187 
188     protected String createRecordHeader(final String type,
189     		final String url, final String create14DigitDate,
190     		final String mimetype, final URI recordId,
191     		final ANVLRecord xtraHeaders, final long contentLength)
192     throws IOException {
193     	final StringBuilder sb =
194     		new StringBuilder(2048/*A SWAG: TODO: Do analysis.*/);
195     	sb.append(WARC_ID).append(CRLF);
196         sb.append(HEADER_KEY_TYPE).append(COLON_SPACE).append(type).
197             append(CRLF);
198         // Do not write a subject-uri if not one present.
199         if (url != null && url.length() > 0) {
200             sb.append(HEADER_KEY_URI).append(COLON_SPACE).
201                 append(checkHeaderValue(url)).append(CRLF);
202         }
203         sb.append(HEADER_KEY_DATE).append(COLON_SPACE).
204             append(create14DigitDate).append(CRLF);
205         if (xtraHeaders != null) {
206             for (final Iterator i = xtraHeaders.iterator(); i.hasNext();) {
207                 sb.append(i.next()).append(CRLF);
208             }
209         }
210 
211         sb.append(HEADER_KEY_ID).append(COLON_SPACE).append('<').
212             append(recordId.toString()).append('>').append(CRLF);
213         if (contentLength > 0) {
214             sb.append(CONTENT_TYPE).append(COLON_SPACE).append(
215                 checkHeaderLineMimetypeParameter(mimetype)).append(CRLF);
216         }
217         sb.append(CONTENT_LENGTH).append(COLON_SPACE).
218             append(Long.toString(contentLength)).append(CRLF);
219     	
220     	return sb.toString();
221     }
222 
223     /***
224      * @deprecated Use {@link #writeRecord(String,String,String,String,URI,ANVLRecord,InputStream,long,boolean)} instead
225      */
226     protected void writeRecord(final String type, final String url,
227     		final String create14DigitDate, final String mimetype,
228     		final URI recordId, ANVLRecord xtraHeaders,
229             final InputStream contentStream, final long contentLength)
230     throws IOException {
231         writeRecord(type, url, create14DigitDate, mimetype, recordId, xtraHeaders, contentStream, contentLength, true);
232     }
233 
234     protected void writeRecord(final String type, final String url,
235     		final String create14DigitDate, final String mimetype,
236     		final URI recordId, ANVLRecord xtraHeaders,
237             final InputStream contentStream, final long contentLength, boolean enforceLength)
238     throws IOException {
239     	if (!TYPES_LIST.contains(type)) {
240     		throw new IllegalArgumentException("Unknown record type: " + type);
241     	}
242     	if (contentLength == 0 &&
243                 (xtraHeaders == null || xtraHeaders.size() <= 0)) {
244     		throw new IllegalArgumentException("Cannot write record " +
245     		    "of content-length zero and base headers only.");
246     	}
247     	
248     	String header;
249     	try {
250     		header = createRecordHeader(type, url,
251     				create14DigitDate, mimetype, recordId, xtraHeaders,
252     				contentLength);
253 
254     	} catch (IllegalArgumentException e) {
255     		logger.log(Level.SEVERE,"could not write record type: " + type 
256     				+ "for URL: " + url, e);
257     		return;
258     	}    	   	
259 
260     	long contentBytes = 0;
261     	long totalBytes = 0;
262     	long startPosition;
263     	
264         try {
265             checkSize(); // may start a new output file
266             startPosition = getPosition();
267             preWriteRecordTasks();
268             
269             // TODO: Revisit encoding of header.
270             totalBytes += write(header.getBytes(WARC_HEADER_ENCODING));
271 
272             if (contentStream != null && contentLength > 0) {
273                 // Write out the header/body separator.
274                 totalBytes += write(CRLF_BYTES); // TODO: should this be written even for zero-length?
275                 contentBytes += copyFrom(contentStream, contentLength, enforceLength);
276                 totalBytes += contentBytes;
277             }
278 
279             // Write out the two blank lines at end of all records, per spec
280             totalBytes += write(CRLF_BYTES);
281             totalBytes += write(CRLF_BYTES);
282         } finally {
283             postWriteRecordTasks();
284         }
285      
286         // TODO: should this be in the finally block?
287         tally(type, contentBytes, totalBytes, getPosition() - startPosition);
288     }
289     
290     // if compression is enabled, sizeOnDisk means compressed bytes; if not, it
291     // should be the same as totalBytes (right?)
292     protected void tally(String recordType, long contentBytes, long totalBytes, long sizeOnDisk) {
293         if (stats == null) {
294             stats = new HashMap<String,Map<String,Long>>();
295         }
296 
297         // add to stats for this record type
298         Map<String,Long> substats = stats.get(recordType);
299         if (substats == null) {
300             substats = new HashMap<String,Long>();
301             stats.put(recordType, substats);
302         }
303         subtally(substats, contentBytes, totalBytes, sizeOnDisk);
304         
305         // add to totals
306         substats = stats.get(TOTALS);
307         if (substats == null) {
308             substats = new HashMap<String,Long>();
309             stats.put(TOTALS, substats);
310         }
311         subtally(substats, contentBytes, totalBytes, sizeOnDisk);
312     }
313 
314     protected void subtally(Map<String,Long> substats, long contentBytes,
315             long totalBytes, long sizeOnDisk) {
316         
317         if (substats.get(NUM_RECORDS) == null) {
318             substats.put(NUM_RECORDS, 1l);
319         } else {
320             substats.put(NUM_RECORDS, substats.get(CONTENT_BYTES) + 1l);
321         }
322         
323         if (substats.get(CONTENT_BYTES) == null) {
324             substats.put(CONTENT_BYTES, contentBytes);
325         } else {
326             substats.put(CONTENT_BYTES, substats.get(CONTENT_BYTES) + contentBytes);
327         }
328         
329         if (substats.get(TOTAL_BYTES) == null) {
330             substats.put(TOTAL_BYTES, totalBytes);
331         } else {
332             substats.put(TOTAL_BYTES, substats.get(TOTAL_BYTES) + totalBytes);
333         }
334         
335         if (substats.get(SIZE_ON_DISK) == null) {
336             substats.put(SIZE_ON_DISK, sizeOnDisk);
337         } else {
338             substats.put(SIZE_ON_DISK, substats.get(SIZE_ON_DISK) + sizeOnDisk);
339         }
340     }
341 
342     protected URI generateRecordId(final Map<String, String> qualifiers)
343     throws IOException {
344     	URI rid = null;
345     	try {
346     		rid = GeneratorFactory.getFactory().
347     			getQualifiedRecordID(qualifiers);
348     	} catch (URISyntaxException e) {
349     		// Convert to IOE so can let it out.
350     		throw new IOException(e.getMessage());
351     	}
352     	return rid;
353     }
354     
355     protected URI generateRecordId(final String key, final String value)
356     throws IOException {
357     	URI rid = null;
358     	try {
359     		rid = GeneratorFactory.getFactory().
360     			getQualifiedRecordID(key, value);
361     	} catch (URISyntaxException e) {
362     		// Convert to IOE so can let it out.
363     		throw new IOException(e.getMessage());
364     	}
365     	return rid;
366     }
367     
368     public URI writeWarcinfoRecord(String filename)
369 	throws IOException {
370     	return writeWarcinfoRecord(filename, null);
371     }
372     
373     public URI writeWarcinfoRecord(String filename, final String description)
374         	throws IOException {
375         // Strip .open suffix if present.
376         if (filename.endsWith(WriterPoolMember.OCCUPIED_SUFFIX)) {
377         	filename = filename.substring(0,
378         		filename.length() - WriterPoolMember.OCCUPIED_SUFFIX.length());
379         }
380         
381         ANVLRecord headerrecord = new ANVLRecord(1);
382         headerrecord.addLabelValue(HEADER_KEY_FILENAME, filename);
383         
384         // Ugh, hate doing this but barring larger refactoring per-WARC
385         // 'metadata' is coming back as List<String> (?!?)
386         String blockfields = "";
387         if (this.fileMetadata == null) {
388             // only encountered in unit tests?
389             blockfields = "dummy: value";
390         } else {
391             for (String s :  (List<String>) fileMetadata) {
392                 blockfields += s;
393             }
394         }
395         byte[] warcinfoBody;
396         if (description != null && description.length() > 0) {
397             // reconstitute and add new description
398             ANVLRecord blockrecord = ANVLRecord.load(blockfields);
399             blockrecord.addLabelValue(CONTENT_DESCRIPTION, description);
400             warcinfoBody = blockrecord.toString().getBytes("UTF-8");
401         } else {
402             // just use in already rendered form
403             warcinfoBody = blockfields.getBytes("UTF-8");
404         }
405 
406         URI uri = writeWarcinfoRecord("application/warc-fields", headerrecord,
407             new ByteArrayInputStream(warcinfoBody), warcinfoBody.length);
408         return uri;
409     }
410     
411     /***
412      * Write a warcinfo to current file.
413      * TODO: Write crawl metadata or pointers to crawl description.
414      * @param mimetype Mimetype of the <code>fileMetadata</code> block.
415      * @param namedFields Named fields. Pass <code>null</code> if none.
416      * @param fileMetadata Metadata about this WARC as RDF, ANVL, etc.
417      * @param fileMetadataLength Length of <code>fileMetadata</code>.
418      * @throws IOException
419      * @return Generated record-id made with
420      * <a href="http://en.wikipedia.org/wiki/Data:_URL">data: scheme</a> and
421      * the current filename.
422      */
423     public URI writeWarcinfoRecord(final String mimetype,
424     	final ANVLRecord namedFields, final InputStream fileMetadata,
425     	final long fileMetadataLength)
426     throws IOException {
427     	final URI recordid = generateRecordId(TYPE, WARCINFO);
428     	writeWarcinfoRecord(ArchiveUtils.getLog14Date(), mimetype, recordid,
429             namedFields, fileMetadata, fileMetadataLength);
430     	return recordid;
431     }
432     
433     /***
434      * Write a <code>warcinfo</code> to current file.
435      * The <code>warcinfo</code> type uses its <code>recordId</code> as its URL.
436      * @param recordId URI to use for this warcinfo.
437      * @param create14DigitDate Record creation date as 14 digit date.
438      * @param mimetype Mimetype of the <code>fileMetadata</code>.
439      * @param namedFields Named fields.
440      * @param fileMetadata Metadata about this WARC as RDF, ANVL, etc.
441      * @param fileMetadataLength Length of <code>fileMetadata</code>.
442      * @throws IOException
443      */
444     public void writeWarcinfoRecord(final String create14DigitDate,
445         final String mimetype, final URI recordId, final ANVLRecord namedFields,
446     	final InputStream fileMetadata, final long fileMetadataLength)
447     throws IOException {
448     	writeRecord(WARCINFO, null, create14DigitDate, mimetype,
449         	recordId, namedFields, fileMetadata, fileMetadataLength, true);
450     }
451     
452     public void writeRequestRecord(final String url,
453         final String create14DigitDate, final String mimetype,
454         final URI recordId,
455         final ANVLRecord namedFields, final InputStream request,
456         final long requestLength)
457     throws IOException {
458         writeRecord(REQUEST, url, create14DigitDate,
459             mimetype, recordId, namedFields, request,
460             requestLength, true);
461     }
462     
463     public void writeResourceRecord(final String url,
464             final String create14DigitDate, final String mimetype,
465             final ANVLRecord namedFields, final InputStream response,
466             final long responseLength)
467     throws IOException {
468     	writeResourceRecord(url, create14DigitDate, mimetype, getRecordID(),
469     			namedFields, response, responseLength);
470     }
471     
472     public void writeResourceRecord(final String url,
473             final String create14DigitDate, final String mimetype,
474             final URI recordId,
475             final ANVLRecord namedFields, final InputStream response,
476             final long responseLength)
477     throws IOException {
478         writeRecord(RESOURCE, url, create14DigitDate,
479             mimetype, recordId, namedFields, response,
480             responseLength, true);
481     }
482 
483     public void writeResponseRecord(final String url,
484             final String create14DigitDate, final String mimetype,
485             final URI recordId,
486             final ANVLRecord namedFields, final InputStream response,
487             final long responseLength)
488     throws IOException {
489         writeRecord(RESPONSE, url, create14DigitDate,
490             mimetype, recordId, namedFields, response,
491             responseLength, true);
492     }
493     
494     public void writeRevisitRecord(final String url,
495             final String create14DigitDate, final String mimetype,
496             final URI recordId,
497             final ANVLRecord namedFields, final InputStream response,
498             final long responseLength)
499     throws IOException {
500         writeRecord(REVISIT, url, create14DigitDate,
501             mimetype, recordId, namedFields, response,
502             responseLength, false);
503     }
504     
505     public void writeMetadataRecord(final String url,
506             final String create14DigitDate, final String mimetype,
507             final URI recordId,
508             final ANVLRecord namedFields, final InputStream metadata,
509             final long metadataLength)
510     throws IOException {
511         writeRecord(METADATA, url, create14DigitDate,
512             mimetype, recordId, namedFields, metadata,
513             metadataLength, true);
514     }
515     
516     /***
517      * Convenience method for getting Record-Ids.
518      * @return A record ID.
519      * @throws IOException
520      */
521     public static URI getRecordID() throws IOException {
522         URI result;
523         try {
524             result = GeneratorFactory.getFactory().getRecordID();
525         } catch (URISyntaxException e) {
526             throw new IOException(e.toString());
527         }
528         return result;
529     }
530 
531     public void resetStats() {
532         if (stats != null) {
533             for (Map<String,Long> substats : stats.values()) {
534                 for (Map.Entry<String,Long> entry : substats.entrySet()) {
535                     entry.setValue(0l);
536                 }
537             }
538         }
539     }
540 
541     public Map<String,Map<String,Long>> getStats() {
542         return stats;
543     }
544 
545     public static long getStat(Map<String,Map<String,Long>> statz, String key, String subkey) {
546         if (statz != null && statz.get(key) != null && statz.get(key).get(subkey) != null) {
547             return statz.get(key).get(subkey);
548         } else {
549             return 0;
550         }
551     }
552 }