1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.archive.io.warc;
24
25 import java.io.ByteArrayInputStream;
26 import java.io.File;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.OutputStream;
30 import java.net.URI;
31 import java.net.URISyntaxException;
32 import java.util.HashMap;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.logging.Level;
38 import java.util.logging.Logger;
39
40 import org.archive.io.WriterPoolMember;
41 import org.archive.uid.GeneratorFactory;
42 import org.archive.util.ArchiveUtils;
43 import org.archive.util.anvl.ANVLRecord;
44
45
46 /***
47 * WARC implementation.
48 *
49 * <p>Assumption is that the caller is managing access to this
50 * WARCWriter ensuring only one thread accessing this WARC instance
51 * at any one time.
52 *
53 * <p>While being written, WARCs have a '.open' suffix appended.
54 *
55 * @author stack
56 * @version $Revision: 4604 $ $Date: 2006-09-05 22:38:18 -0700 (Tue, 05 Sep 2006) $
57 */
58 public class WARCWriter extends WriterPoolMember implements WARCConstants {
59 public static final String TOTALS = "totals";
60 public static final String SIZE_ON_DISK = "sizeOnDisk";
61 public static final String TOTAL_BYTES = "totalBytes";
62 public static final String CONTENT_BYTES = "contentBytes";
63 public static final String NUM_RECORDS = "numRecords";
64
65 private static final Logger logger =
66 Logger.getLogger(WARCWriter.class.getName());
67
68 /***
69 * NEWLINE as bytes.
70 */
71 public static byte [] CRLF_BYTES;
72 static {
73 try {
74 CRLF_BYTES = CRLF.getBytes(DEFAULT_ENCODING);
75 } catch(Exception e) {
76 e.printStackTrace();
77 }
78 };
79
80 /***
81 * Metadata.
82 */
83 private final List<String> fileMetadata;
84
85 private Map<String,Map<String,Long>> stats;
86
87 /***
88 * Shutdown Constructor
89 * Has default access so can make instance to test utility methods.
90 */
91 WARCWriter() {
92 this(null, null, "", "", true, -1, null);
93 }
94
95 /***
96 * Constructor.
97 * Takes a stream. Use with caution. There is no upperbound check on size.
98 * Will just keep writing. Only pass Streams that are bounded.
99 * @param serialNo used to generate unique file name sequences
100 * @param out Where to write.
101 * @param f File the <code>out</code> is connected to.
102 * @param cmprs Compress the content written.
103 * @param a14DigitDate If null, we'll write current time.
104 * @throws IOException
105 */
106 public WARCWriter(final AtomicInteger serialNo,
107 final OutputStream out, final File f,
108 final boolean cmprs, final String a14DigitDate,
109 final List<String> warcinfoData)
110 throws IOException {
111 super(serialNo, out, f, cmprs, a14DigitDate);
112 this.fileMetadata = warcinfoData;
113 }
114
115 /***
116 * Constructor.
117 *
118 * @param dirs Where to drop files.
119 * @param prefix File prefix to use.
120 * @param cmprs Compress the records written.
121 * @param maxSize Maximum size for ARC files written.
122 * @param suffix File tail to use. If null, unused.
123 * @param warcinfoData File metadata for warcinfo record.
124 */
125 public WARCWriter(final AtomicInteger serialNo,
126 final List<File> dirs, final String prefix,
127 final String suffix, final boolean cmprs,
128 final long maxSize, final List<String> warcinfoData) {
129 super(serialNo, dirs, prefix, suffix, cmprs, maxSize,
130 WARC_FILE_EXTENSION);
131 this.fileMetadata = warcinfoData;
132 }
133
134 @Override
135 protected String createFile(File file) throws IOException {
136 String filename = super.createFile(file);
137 writeWarcinfoRecord(filename);
138 return filename;
139 }
140
141 protected void baseCharacterCheck(final char c, final String parameter)
142 throws IllegalArgumentException {
143
144 if (Character.isISOControl(c) || !Character.isValidCodePoint(c)) {
145 throw new IllegalArgumentException("Contains illegal character 0x" +
146 Integer.toHexString(c) + ": " + parameter);
147 }
148 }
149
150 protected String checkHeaderValue(final String value)
151 throws IllegalArgumentException {
152 for (int i = 0; i < value.length(); i++) {
153 final char c = value.charAt(i);
154 baseCharacterCheck(c, value);
155 if (Character.isWhitespace(c)) {
156 throw new IllegalArgumentException("Contains disallowed white space 0x" +
157 Integer.toHexString(c) + ": " + value);
158 }
159 }
160 return value;
161 }
162
163 protected String checkHeaderLineMimetypeParameter(final String parameter)
164 throws IOException {
165 StringBuilder sb = new StringBuilder(parameter.length());
166 boolean wasWhitespace = false;
167 for (int i = 0; i < parameter.length(); i++) {
168 char c = parameter.charAt(i);
169 if (Character.isWhitespace(c)) {
170
171
172
173 if (wasWhitespace) {
174 continue;
175 }
176 wasWhitespace = true;
177 c = ' ';
178 } else {
179 wasWhitespace = false;
180 baseCharacterCheck(c, parameter);
181 }
182 sb.append(c);
183 }
184
185 return sb.toString();
186 }
187
188 protected String createRecordHeader(final String type,
189 final String url, final String create14DigitDate,
190 final String mimetype, final URI recordId,
191 final ANVLRecord xtraHeaders, final long contentLength)
192 throws IOException {
193 final StringBuilder sb =
194 new StringBuilder(2048
195 sb.append(WARC_ID).append(CRLF);
196 sb.append(HEADER_KEY_TYPE).append(COLON_SPACE).append(type).
197 append(CRLF);
198
199 if (url != null && url.length() > 0) {
200 sb.append(HEADER_KEY_URI).append(COLON_SPACE).
201 append(checkHeaderValue(url)).append(CRLF);
202 }
203 sb.append(HEADER_KEY_DATE).append(COLON_SPACE).
204 append(create14DigitDate).append(CRLF);
205 if (xtraHeaders != null) {
206 for (final Iterator i = xtraHeaders.iterator(); i.hasNext();) {
207 sb.append(i.next()).append(CRLF);
208 }
209 }
210
211 sb.append(HEADER_KEY_ID).append(COLON_SPACE).append('<').
212 append(recordId.toString()).append('>').append(CRLF);
213 if (contentLength > 0) {
214 sb.append(CONTENT_TYPE).append(COLON_SPACE).append(
215 checkHeaderLineMimetypeParameter(mimetype)).append(CRLF);
216 }
217 sb.append(CONTENT_LENGTH).append(COLON_SPACE).
218 append(Long.toString(contentLength)).append(CRLF);
219
220 return sb.toString();
221 }
222
223 /***
224 * @deprecated Use {@link #writeRecord(String,String,String,String,URI,ANVLRecord,InputStream,long,boolean)} instead
225 */
226 protected void writeRecord(final String type, final String url,
227 final String create14DigitDate, final String mimetype,
228 final URI recordId, ANVLRecord xtraHeaders,
229 final InputStream contentStream, final long contentLength)
230 throws IOException {
231 writeRecord(type, url, create14DigitDate, mimetype, recordId, xtraHeaders, contentStream, contentLength, true);
232 }
233
234 protected void writeRecord(final String type, final String url,
235 final String create14DigitDate, final String mimetype,
236 final URI recordId, ANVLRecord xtraHeaders,
237 final InputStream contentStream, final long contentLength, boolean enforceLength)
238 throws IOException {
239 if (!TYPES_LIST.contains(type)) {
240 throw new IllegalArgumentException("Unknown record type: " + type);
241 }
242 if (contentLength == 0 &&
243 (xtraHeaders == null || xtraHeaders.size() <= 0)) {
244 throw new IllegalArgumentException("Cannot write record " +
245 "of content-length zero and base headers only.");
246 }
247
248 String header;
249 try {
250 header = createRecordHeader(type, url,
251 create14DigitDate, mimetype, recordId, xtraHeaders,
252 contentLength);
253
254 } catch (IllegalArgumentException e) {
255 logger.log(Level.SEVERE,"could not write record type: " + type
256 + "for URL: " + url, e);
257 return;
258 }
259
260 long contentBytes = 0;
261 long totalBytes = 0;
262 long startPosition;
263
264 try {
265 checkSize();
266 startPosition = getPosition();
267 preWriteRecordTasks();
268
269
270 totalBytes += write(header.getBytes(WARC_HEADER_ENCODING));
271
272 if (contentStream != null && contentLength > 0) {
273
274 totalBytes += write(CRLF_BYTES);
275 contentBytes += copyFrom(contentStream, contentLength, enforceLength);
276 totalBytes += contentBytes;
277 }
278
279
280 totalBytes += write(CRLF_BYTES);
281 totalBytes += write(CRLF_BYTES);
282 } finally {
283 postWriteRecordTasks();
284 }
285
286
287 tally(type, contentBytes, totalBytes, getPosition() - startPosition);
288 }
289
290
291
292 protected void tally(String recordType, long contentBytes, long totalBytes, long sizeOnDisk) {
293 if (stats == null) {
294 stats = new HashMap<String,Map<String,Long>>();
295 }
296
297
298 Map<String,Long> substats = stats.get(recordType);
299 if (substats == null) {
300 substats = new HashMap<String,Long>();
301 stats.put(recordType, substats);
302 }
303 subtally(substats, contentBytes, totalBytes, sizeOnDisk);
304
305
306 substats = stats.get(TOTALS);
307 if (substats == null) {
308 substats = new HashMap<String,Long>();
309 stats.put(TOTALS, substats);
310 }
311 subtally(substats, contentBytes, totalBytes, sizeOnDisk);
312 }
313
314 protected void subtally(Map<String,Long> substats, long contentBytes,
315 long totalBytes, long sizeOnDisk) {
316
317 if (substats.get(NUM_RECORDS) == null) {
318 substats.put(NUM_RECORDS, 1l);
319 } else {
320 substats.put(NUM_RECORDS, substats.get(CONTENT_BYTES) + 1l);
321 }
322
323 if (substats.get(CONTENT_BYTES) == null) {
324 substats.put(CONTENT_BYTES, contentBytes);
325 } else {
326 substats.put(CONTENT_BYTES, substats.get(CONTENT_BYTES) + contentBytes);
327 }
328
329 if (substats.get(TOTAL_BYTES) == null) {
330 substats.put(TOTAL_BYTES, totalBytes);
331 } else {
332 substats.put(TOTAL_BYTES, substats.get(TOTAL_BYTES) + totalBytes);
333 }
334
335 if (substats.get(SIZE_ON_DISK) == null) {
336 substats.put(SIZE_ON_DISK, sizeOnDisk);
337 } else {
338 substats.put(SIZE_ON_DISK, substats.get(SIZE_ON_DISK) + sizeOnDisk);
339 }
340 }
341
342 protected URI generateRecordId(final Map<String, String> qualifiers)
343 throws IOException {
344 URI rid = null;
345 try {
346 rid = GeneratorFactory.getFactory().
347 getQualifiedRecordID(qualifiers);
348 } catch (URISyntaxException e) {
349
350 throw new IOException(e.getMessage());
351 }
352 return rid;
353 }
354
355 protected URI generateRecordId(final String key, final String value)
356 throws IOException {
357 URI rid = null;
358 try {
359 rid = GeneratorFactory.getFactory().
360 getQualifiedRecordID(key, value);
361 } catch (URISyntaxException e) {
362
363 throw new IOException(e.getMessage());
364 }
365 return rid;
366 }
367
368 public URI writeWarcinfoRecord(String filename)
369 throws IOException {
370 return writeWarcinfoRecord(filename, null);
371 }
372
373 public URI writeWarcinfoRecord(String filename, final String description)
374 throws IOException {
375
376 if (filename.endsWith(WriterPoolMember.OCCUPIED_SUFFIX)) {
377 filename = filename.substring(0,
378 filename.length() - WriterPoolMember.OCCUPIED_SUFFIX.length());
379 }
380
381 ANVLRecord headerrecord = new ANVLRecord(1);
382 headerrecord.addLabelValue(HEADER_KEY_FILENAME, filename);
383
384
385
386 String blockfields = "";
387 if (this.fileMetadata == null) {
388
389 blockfields = "dummy: value";
390 } else {
391 for (String s : (List<String>) fileMetadata) {
392 blockfields += s;
393 }
394 }
395 byte[] warcinfoBody;
396 if (description != null && description.length() > 0) {
397
398 ANVLRecord blockrecord = ANVLRecord.load(blockfields);
399 blockrecord.addLabelValue(CONTENT_DESCRIPTION, description);
400 warcinfoBody = blockrecord.toString().getBytes("UTF-8");
401 } else {
402
403 warcinfoBody = blockfields.getBytes("UTF-8");
404 }
405
406 URI uri = writeWarcinfoRecord("application/warc-fields", headerrecord,
407 new ByteArrayInputStream(warcinfoBody), warcinfoBody.length);
408 return uri;
409 }
410
411 /***
412 * Write a warcinfo to current file.
413 * TODO: Write crawl metadata or pointers to crawl description.
414 * @param mimetype Mimetype of the <code>fileMetadata</code> block.
415 * @param namedFields Named fields. Pass <code>null</code> if none.
416 * @param fileMetadata Metadata about this WARC as RDF, ANVL, etc.
417 * @param fileMetadataLength Length of <code>fileMetadata</code>.
418 * @throws IOException
419 * @return Generated record-id made with
420 * <a href="http://en.wikipedia.org/wiki/Data:_URL">data: scheme</a> and
421 * the current filename.
422 */
423 public URI writeWarcinfoRecord(final String mimetype,
424 final ANVLRecord namedFields, final InputStream fileMetadata,
425 final long fileMetadataLength)
426 throws IOException {
427 final URI recordid = generateRecordId(TYPE, WARCINFO);
428 writeWarcinfoRecord(ArchiveUtils.getLog14Date(), mimetype, recordid,
429 namedFields, fileMetadata, fileMetadataLength);
430 return recordid;
431 }
432
433 /***
434 * Write a <code>warcinfo</code> to current file.
435 * The <code>warcinfo</code> type uses its <code>recordId</code> as its URL.
436 * @param recordId URI to use for this warcinfo.
437 * @param create14DigitDate Record creation date as 14 digit date.
438 * @param mimetype Mimetype of the <code>fileMetadata</code>.
439 * @param namedFields Named fields.
440 * @param fileMetadata Metadata about this WARC as RDF, ANVL, etc.
441 * @param fileMetadataLength Length of <code>fileMetadata</code>.
442 * @throws IOException
443 */
444 public void writeWarcinfoRecord(final String create14DigitDate,
445 final String mimetype, final URI recordId, final ANVLRecord namedFields,
446 final InputStream fileMetadata, final long fileMetadataLength)
447 throws IOException {
448 writeRecord(WARCINFO, null, create14DigitDate, mimetype,
449 recordId, namedFields, fileMetadata, fileMetadataLength, true);
450 }
451
452 public void writeRequestRecord(final String url,
453 final String create14DigitDate, final String mimetype,
454 final URI recordId,
455 final ANVLRecord namedFields, final InputStream request,
456 final long requestLength)
457 throws IOException {
458 writeRecord(REQUEST, url, create14DigitDate,
459 mimetype, recordId, namedFields, request,
460 requestLength, true);
461 }
462
463 public void writeResourceRecord(final String url,
464 final String create14DigitDate, final String mimetype,
465 final ANVLRecord namedFields, final InputStream response,
466 final long responseLength)
467 throws IOException {
468 writeResourceRecord(url, create14DigitDate, mimetype, getRecordID(),
469 namedFields, response, responseLength);
470 }
471
472 public void writeResourceRecord(final String url,
473 final String create14DigitDate, final String mimetype,
474 final URI recordId,
475 final ANVLRecord namedFields, final InputStream response,
476 final long responseLength)
477 throws IOException {
478 writeRecord(RESOURCE, url, create14DigitDate,
479 mimetype, recordId, namedFields, response,
480 responseLength, true);
481 }
482
483 public void writeResponseRecord(final String url,
484 final String create14DigitDate, final String mimetype,
485 final URI recordId,
486 final ANVLRecord namedFields, final InputStream response,
487 final long responseLength)
488 throws IOException {
489 writeRecord(RESPONSE, url, create14DigitDate,
490 mimetype, recordId, namedFields, response,
491 responseLength, true);
492 }
493
494 public void writeRevisitRecord(final String url,
495 final String create14DigitDate, final String mimetype,
496 final URI recordId,
497 final ANVLRecord namedFields, final InputStream response,
498 final long responseLength)
499 throws IOException {
500 writeRecord(REVISIT, url, create14DigitDate,
501 mimetype, recordId, namedFields, response,
502 responseLength, false);
503 }
504
505 public void writeMetadataRecord(final String url,
506 final String create14DigitDate, final String mimetype,
507 final URI recordId,
508 final ANVLRecord namedFields, final InputStream metadata,
509 final long metadataLength)
510 throws IOException {
511 writeRecord(METADATA, url, create14DigitDate,
512 mimetype, recordId, namedFields, metadata,
513 metadataLength, true);
514 }
515
516 /***
517 * Convenience method for getting Record-Ids.
518 * @return A record ID.
519 * @throws IOException
520 */
521 public static URI getRecordID() throws IOException {
522 URI result;
523 try {
524 result = GeneratorFactory.getFactory().getRecordID();
525 } catch (URISyntaxException e) {
526 throw new IOException(e.toString());
527 }
528 return result;
529 }
530
531 public void resetStats() {
532 if (stats != null) {
533 for (Map<String,Long> substats : stats.values()) {
534 for (Map.Entry<String,Long> entry : substats.entrySet()) {
535 entry.setValue(0l);
536 }
537 }
538 }
539 }
540
541 public Map<String,Map<String,Long>> getStats() {
542 return stats;
543 }
544
545 public static long getStat(Map<String,Map<String,Long>> statz, String key, String subkey) {
546 if (statz != null && statz.get(key) != null && statz.get(key).get(subkey) != null) {
547 return statz.get(key).get(subkey);
548 } else {
549 return 0;
550 }
551 }
552 }