1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.archive.crawler.io;
24
25 import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
26 import it.unimi.dsi.mg4j.util.MutableString;
27
28 import java.io.BufferedInputStream;
29 import java.io.BufferedReader;
30 import java.io.File;
31 import java.io.FileInputStream;
32 import java.io.FileNotFoundException;
33 import java.io.FileOutputStream;
34 import java.io.IOException;
35 import java.io.InputStream;
36 import java.io.InputStreamReader;
37 import java.io.OutputStreamWriter;
38 import java.io.Writer;
39 import java.net.URL;
40 import java.net.URLConnection;
41 import java.util.zip.GZIPInputStream;
42 import java.util.zip.GZIPOutputStream;
43
44 import org.archive.util.ArchiveUtils;
45
46 /***
47 * Utility class for a crawler journal/log that is compressed and
48 * rotates by serial number at checkpoints.
49 *
50 * @author gojomo
51 */
52 public class CrawlerJournal {
53
54 /*** prefix for error lines*/
55 public static final String LOG_ERROR = "E ";
56 /*** prefix for timestamp lines */
57 public static final String LOG_TIMESTAMP = "T ";
58
59 /***
60 * Get a BufferedReader on the crawler journal given
61 *
62 * @param source File journal
63 * @return journal buffered reader.
64 * @throws IOException
65 */
66 public static BufferedReader getBufferedReader(File source) throws IOException {
67 boolean isGzipped = source.getName().toLowerCase().
68 endsWith(GZIP_SUFFIX);
69 FileInputStream fis = new FileInputStream(source);
70 return new BufferedReader(isGzipped?
71 new InputStreamReader(new GZIPInputStream(fis)):
72 new InputStreamReader(fis));
73 }
74
75 /***
76 * Get a BufferedReader on the crawler journal given.
77 *
78 * @param source URL journal
79 * @return journal buffered reader.
80 * @throws IOException
81 */
82 public static BufferedReader getBufferedReader(URL source) throws IOException {
83 URLConnection conn = source.openConnection();
84 boolean isGzipped = conn.getContentType() != null && conn.getContentType().equalsIgnoreCase("application/x-gzip")
85 || conn.getContentEncoding() != null && conn.getContentEncoding().equalsIgnoreCase("gzip");
86 InputStream uis = conn.getInputStream();
87 return new BufferedReader(isGzipped?
88 new InputStreamReader(new GZIPInputStream(uis)):
89 new InputStreamReader(uis));
90
91 }
92
93 /***
94 * Get a BufferedInputStream on the recovery file given.
95 *
96 * @param source file to open
97 * @return journal buffered input stream.
98 * @throws IOException
99 */
100 public static BufferedInputStream getBufferedInput(File source) throws IOException {
101 boolean isGzipped = source.getName().toLowerCase().
102 endsWith(GZIP_SUFFIX);
103 FileInputStream fis = new FileInputStream(source);
104 return isGzipped ? new BufferedInputStream(new GZIPInputStream(fis))
105 : new BufferedInputStream(fis);
106 }
107
108 /***
109 * Stream on which we record frontier events.
110 */
111 protected Writer out = null;
112
113 /*** line count */
114 protected long lines = 0;
115 /*** number of lines between timestamps */
116 protected int timestamp_interval = 0;
117
118
119 /*** suffix to recognize gzipped files */
120 public static final String GZIP_SUFFIX = ".gz";
121
122 /***
123 * File we're writing journal to.
124 * Keep a reference in case we want to rotate it off.
125 */
126 protected File gzipFile = null;
127
128 /***
129 * Create a new crawler journal at the given location
130 *
131 * @param path Directory to make thejournal in.
132 * @param filename Name to use for journal file.
133 * @throws IOException
134 */
135 public CrawlerJournal(String path, String filename)
136 throws IOException {
137 this.gzipFile = new File(path, filename);
138 this.out = initialize(gzipFile);
139 }
140
141 /***
142 * Create a new crawler journal at the given location
143 *
144 * @param file path at which to make journal
145 * @throws IOException
146 */
147 public CrawlerJournal(File file) throws IOException {
148 this.gzipFile = file;
149 this.out = initialize(gzipFile);
150 }
151
152 /***
153 * Allocate a buffer for accumulating lines to write and reuse it.
154 */
155 protected MutableString accumulatingBuffer = new MutableString(1024);
156
157 protected Writer initialize(final File f) throws FileNotFoundException, IOException {
158 return new OutputStreamWriter(new GZIPOutputStream(
159 new FastBufferedOutputStream(new FileOutputStream(f))));
160 }
161
162 /***
163 * Write a line
164 *
165 * @param string String
166 */
167 public synchronized void writeLine(String string) {
168 try {
169 this.out.write("\n");
170 this.out.write(string);
171 noteLine();
172 } catch (IOException e) {
173 e.printStackTrace();
174 }
175 }
176
177 /***
178 * Write a line of two strings
179 *
180 * @param s1 String
181 * @param s2 String
182 */
183 public synchronized void writeLine(String s1, String s2) {
184 try {
185 this.out.write("\n");
186 this.out.write(s1);
187 this.out.write(s2);
188 noteLine();
189 } catch (IOException e) {
190 e.printStackTrace();
191 }
192 }
193
194 /***
195 * Write a line of three strings
196 *
197 * @param s1 String
198 * @param s2 String
199 * @param s3 String
200 */
201 public synchronized void writeLine(String s1, String s2, String s3) {
202 try {
203 this.out.write("\n");
204 this.out.write(s1);
205 this.out.write(s2);
206 this.out.write(s3);
207 noteLine();
208 } catch (IOException e) {
209 e.printStackTrace();
210 }
211 }
212
213 /***
214 * Write a line.
215 *
216 * @param mstring MutableString to write
217 */
218 public synchronized void writeLine(MutableString mstring) {
219 if (this.out == null) {
220 return;
221 }
222 try {
223 this.out.write("\n");
224 mstring.write(out);
225 noteLine();
226 } catch (IOException e) {
227 e.printStackTrace();
228 }
229 }
230
231 /***
232 * Count and note a line
233 *
234 * @throws IOException
235 */
236 protected void noteLine() throws IOException {
237 lines++;
238 considerTimestamp();
239 }
240
241 /***
242 * Write a timestamp line if appropriate
243 *
244 * @throws IOException
245 */
246 protected void considerTimestamp() throws IOException {
247 if(timestamp_interval > 0 && lines % timestamp_interval == 0) {
248 out.write("\n");
249 out.write(LOG_TIMESTAMP);
250 out.write(ArchiveUtils.getLog14Date());
251 }
252 }
253
254 /***
255 * Flush and close the underlying IO objects.
256 */
257 public void close() {
258 if (this.out == null) {
259 return;
260 }
261 try {
262 this.out.flush();
263 this.out.close();
264 this.out = null;
265 } catch (IOException e) {
266 e.printStackTrace();
267 }
268 }
269
270 /***
271 * Note a serious error vioa a special log line
272 *
273 * @param err
274 */
275 public void seriousError(String err) {
276 writeLine("\n"+LOG_ERROR+ArchiveUtils.getLog14Date()+" "+err);
277 }
278
279 /***
280 * Handle a checkpoint by rotating the current log to a checkpoint-named
281 * file and starting a new log.
282 *
283 * @param checkpointDir
284 * @throws IOException
285 */
286 public synchronized void checkpoint(final File checkpointDir) throws IOException {
287 if (this.out == null || !this.gzipFile.exists()) {
288 return;
289 }
290 close();
291
292 this.gzipFile.renameTo(new File(this.gzipFile.getParentFile(),
293 this.gzipFile.getName() + "." + checkpointDir.getName()));
294
295 this.out = initialize(this.gzipFile);
296 }
297
298 }