View Javadoc

1   /*
2    * ARCWriter
3    *
4    * $Id: ARCWriterProcessor.java 6439 2009-08-06 01:14:47Z gojomo $
5    *
6    * Created on Jun 5, 2003
7    *
8    * Copyright (C) 2003 Internet Archive.
9    *
10   * This file is part of the Heritrix web crawler (crawler.archive.org).
11   *
12   * Heritrix is free software; you can redistribute it and/or modify
13   * it under the terms of the GNU Lesser Public License as published by
14   * the Free Software Foundation; either version 2.1 of the License, or
15   * any later version.
16   *
17   * Heritrix is distributed in the hope that it will be useful,
18   * but WITHOUT ANY WARRANTY; without even the implied warranty of
19   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20   * GNU Lesser Public License for more details.
21   *
22   * You should have received a copy of the GNU Lesser Public License
23   * along with Heritrix; if not, write to the Free Software
24   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25   */
26  package org.archive.crawler.writer;
27  
28  import java.io.IOException;
29  import java.io.InputStream;
30  import java.util.concurrent.atomic.AtomicInteger;
31  import java.util.logging.Level;
32  import java.util.logging.Logger;
33  
34  import org.apache.commons.io.IOUtils;
35  import org.archive.crawler.datamodel.CoreAttributeConstants;
36  import org.archive.crawler.datamodel.CrawlURI;
37  import org.archive.crawler.datamodel.FetchStatusCodes;
38  import org.archive.crawler.event.CrawlStatusListener;
39  import org.archive.crawler.framework.WriterPoolProcessor;
40  import org.archive.io.ReplayInputStream;
41  import org.archive.io.WriterPoolMember;
42  import org.archive.io.WriterPoolSettings;
43  import org.archive.io.arc.ARCConstants;
44  import org.archive.io.arc.ARCWriter;
45  import org.archive.io.arc.ARCWriterPool;
46  
47  
48  /***
49   * Processor module for writing the results of successful fetches (and
50   * perhaps someday, certain kinds of network failures) to the Internet Archive
51   * ARC file format.
52   *
53   * Assumption is that there is only one of these ARCWriterProcessors per
54   * Heritrix instance.
55   *
56   * @author Parker Thompson
57   */
58  public class ARCWriterProcessor extends WriterPoolProcessor
59  implements CoreAttributeConstants, ARCConstants, CrawlStatusListener,
60  WriterPoolSettings, FetchStatusCodes {
61  	private static final long serialVersionUID = 1957518408532644531L;
62  
63  	private final Logger logger = Logger.getLogger(this.getClass().getName());
64      
65      public long getDefaultMaxFileSize() {
66          return 100000000L; // 100 SI mega-bytes (10^8 bytes), by tradition
67      }
68      
69      /***
70       * Default path list.
71       */
72      private static final String [] DEFAULT_PATH = {"arcs"};
73  
74      /***
75       * @param name Name of this writer.
76       */
77      public ARCWriterProcessor(String name) {
78          super(name, "ARCWriter processor");
79      }
80      
81      protected String [] getDefaultPath() {
82      	return DEFAULT_PATH;
83  	}
84  
85      protected void setupPool(final AtomicInteger serialNo) {
86  		setPool(new ARCWriterPool(serialNo, this, getPoolMaximumActive(),
87              getPoolMaximumWait()));
88      }
89      
90      /***
91       * Writes a CrawlURI and its associated data to store file.
92       *
93       * Currently this method understands the following uri types: dns, http, 
94       * and https.
95       *
96       * @param curi CrawlURI to process.
97       */
98      protected void innerProcess(CrawlURI curi) {
99          // If failure, or we haven't fetched the resource yet, return
100         if (curi.getFetchStatus() <= 0) {
101             return;
102         }
103         
104         // XXX This happens normally with ftp (empty directory or equivalent of
105         // 40x). We should write a record here, but we're not worrying about
106         // full ftp support for arcs, just warcs.
107         if (curi.getHttpRecorder() == null) {
108             return;
109         }
110         
111         // If no recorded content at all, don't write record.
112         long recordLength = curi.getHttpRecorder().getRecordedInput().getSize();
113         if (recordLength <= 0) {
114         	// getContentSize() should be > 0 if any material (even just
115             // HTTP headers with zero-length body) is available. 
116         	return;
117         }
118         
119         ReplayInputStream ris = null; 
120         try {
121             if(shouldWrite(curi)) {
122                 ris = curi.getHttpRecorder().getRecordedInput()
123                         .getReplayInputStream();
124                 write(curi, recordLength, ris, getHostAddress(curi));
125             } else {
126                 logger.info("does not write " + curi.toString());
127             }
128         } catch (IOException e) {
129             curi.addLocalizedError(this.getName(), e, "WriteRecord: " +
130                 curi.toString());
131             logger.log(Level.SEVERE, "Failed write of Record: " +
132                 curi.toString(), e);
133         } finally {
134             IOUtils.closeQuietly(ris); 
135         }
136     }
137     
138     protected void write(CrawlURI curi, long recordLength, InputStream in,
139         String ip)
140     throws IOException {
141         WriterPoolMember writer = getPool().borrowFile();
142         long position = writer.getPosition();
143         // See if we need to open a new file because we've exceeed maxBytes.
144         // Call to checkFileSize will open new file if we're at maximum for
145         // current file.
146         writer.checkSize();
147         if (writer.getPosition() != position) {
148             // We just closed the file because it was larger than maxBytes.
149             // Add to the totalBytesWritten the size of the first record
150             // in the file, if any.
151             setTotalBytesWritten(getTotalBytesWritten() +
152             	(writer.getPosition() - position));
153             position = writer.getPosition();
154         }
155         
156         ARCWriter w = (ARCWriter)writer;
157         try {
158             if (in instanceof ReplayInputStream) {
159                 w.write(curi.toString(), curi.getContentType(),
160                     ip, curi.getLong(A_FETCH_BEGAN_TIME),
161                     recordLength, (ReplayInputStream)in);
162             } else {
163                 w.write(curi.toString(), curi.getContentType(),
164                     ip, curi.getLong(A_FETCH_BEGAN_TIME),
165                     recordLength, in);
166             }
167         } catch (IOException e) {
168             // Invalidate this file (It gets a '.invalid' suffix).
169             getPool().invalidateFile(writer);
170             // Set the writer to null otherwise the pool accounting
171             // of how many active writers gets skewed if we subsequently
172             // do a returnWriter call on this object in the finally block.
173             writer = null;
174             throw e;
175         } finally {
176             if (writer != null) {
177             	setTotalBytesWritten(getTotalBytesWritten() +
178             	     (writer.getPosition() - position));
179                 getPool().returnFile(writer);
180             }
181         }
182         checkBytesWritten();
183     }
184     
185     @Override
186     protected String getFirstrecordStylesheet() {
187         return "/arcMetaheaderBody.xsl";
188     }
189 }