1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package org.archive.crawler.writer;
27
28 import java.io.IOException;
29 import java.io.InputStream;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.logging.Level;
32 import java.util.logging.Logger;
33
34 import org.apache.commons.io.IOUtils;
35 import org.archive.crawler.datamodel.CoreAttributeConstants;
36 import org.archive.crawler.datamodel.CrawlURI;
37 import org.archive.crawler.datamodel.FetchStatusCodes;
38 import org.archive.crawler.event.CrawlStatusListener;
39 import org.archive.crawler.framework.WriterPoolProcessor;
40 import org.archive.io.ReplayInputStream;
41 import org.archive.io.WriterPoolMember;
42 import org.archive.io.WriterPoolSettings;
43 import org.archive.io.arc.ARCConstants;
44 import org.archive.io.arc.ARCWriter;
45 import org.archive.io.arc.ARCWriterPool;
46
47
48 /***
49 * Processor module for writing the results of successful fetches (and
50 * perhaps someday, certain kinds of network failures) to the Internet Archive
51 * ARC file format.
52 *
53 * Assumption is that there is only one of these ARCWriterProcessors per
54 * Heritrix instance.
55 *
56 * @author Parker Thompson
57 */
58 public class ARCWriterProcessor extends WriterPoolProcessor
59 implements CoreAttributeConstants, ARCConstants, CrawlStatusListener,
60 WriterPoolSettings, FetchStatusCodes {
61 private static final long serialVersionUID = 1957518408532644531L;
62
63 private final Logger logger = Logger.getLogger(this.getClass().getName());
64
65 public long getDefaultMaxFileSize() {
66 return 100000000L;
67 }
68
69 /***
70 * Default path list.
71 */
72 private static final String [] DEFAULT_PATH = {"arcs"};
73
74 /***
75 * @param name Name of this writer.
76 */
77 public ARCWriterProcessor(String name) {
78 super(name, "ARCWriter processor");
79 }
80
81 protected String [] getDefaultPath() {
82 return DEFAULT_PATH;
83 }
84
85 protected void setupPool(final AtomicInteger serialNo) {
86 setPool(new ARCWriterPool(serialNo, this, getPoolMaximumActive(),
87 getPoolMaximumWait()));
88 }
89
90 /***
91 * Writes a CrawlURI and its associated data to store file.
92 *
93 * Currently this method understands the following uri types: dns, http,
94 * and https.
95 *
96 * @param curi CrawlURI to process.
97 */
98 protected void innerProcess(CrawlURI curi) {
99
100 if (curi.getFetchStatus() <= 0) {
101 return;
102 }
103
104
105
106
107 if (curi.getHttpRecorder() == null) {
108 return;
109 }
110
111
112 long recordLength = curi.getHttpRecorder().getRecordedInput().getSize();
113 if (recordLength <= 0) {
114
115
116 return;
117 }
118
119 ReplayInputStream ris = null;
120 try {
121 if(shouldWrite(curi)) {
122 ris = curi.getHttpRecorder().getRecordedInput()
123 .getReplayInputStream();
124 write(curi, recordLength, ris, getHostAddress(curi));
125 } else {
126 logger.info("does not write " + curi.toString());
127 }
128 } catch (IOException e) {
129 curi.addLocalizedError(this.getName(), e, "WriteRecord: " +
130 curi.toString());
131 logger.log(Level.SEVERE, "Failed write of Record: " +
132 curi.toString(), e);
133 } finally {
134 IOUtils.closeQuietly(ris);
135 }
136 }
137
138 protected void write(CrawlURI curi, long recordLength, InputStream in,
139 String ip)
140 throws IOException {
141 WriterPoolMember writer = getPool().borrowFile();
142 long position = writer.getPosition();
143
144
145
146 writer.checkSize();
147 if (writer.getPosition() != position) {
148
149
150
151 setTotalBytesWritten(getTotalBytesWritten() +
152 (writer.getPosition() - position));
153 position = writer.getPosition();
154 }
155
156 ARCWriter w = (ARCWriter)writer;
157 try {
158 if (in instanceof ReplayInputStream) {
159 w.write(curi.toString(), curi.getContentType(),
160 ip, curi.getLong(A_FETCH_BEGAN_TIME),
161 recordLength, (ReplayInputStream)in);
162 } else {
163 w.write(curi.toString(), curi.getContentType(),
164 ip, curi.getLong(A_FETCH_BEGAN_TIME),
165 recordLength, in);
166 }
167 } catch (IOException e) {
168
169 getPool().invalidateFile(writer);
170
171
172
173 writer = null;
174 throw e;
175 } finally {
176 if (writer != null) {
177 setTotalBytesWritten(getTotalBytesWritten() +
178 (writer.getPosition() - position));
179 getPool().returnFile(writer);
180 }
181 }
182 checkBytesWritten();
183 }
184
185 @Override
186 protected String getFirstrecordStylesheet() {
187 return "/arcMetaheaderBody.xsl";
188 }
189 }