1 /*
2 * Autopsy Forensic Browser
3 *
4 * Copyright 2011 Basis Technology Corp.
5 * Contact: carrier <at> sleuthkit <dot> org
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19 package org.sleuthkit.autopsy.keywordsearch;
20
21 import java.io.ByteArrayInputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.Reader;
25 import java.io.UnsupportedEncodingException;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.logging.Level;
35 import org.apache.solr.client.solrj.SolrRequest.METHOD;
36 import org.apache.solr.client.solrj.SolrServerException;
37 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
38 import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
39 import org.apache.solr.common.SolrException;
40 import org.apache.solr.common.SolrException.ErrorCode;
41 import org.apache.solr.common.util.ContentStream;
42 import org.apache.solr.common.SolrInputDocument;
43 import org.openide.util.Exceptions;
44 import org.openide.util.NbBundle;
61
65 class Ingester {
66
67 private static final Logger logger = Logger.getLogger(Ingester.class.getName());
68 private volatile boolean uncommitedIngests = false;
69 private final ExecutorService upRequestExecutor = Executors.newSingleThreadExecutor();
70 private final Server solrServer = KeywordSearch.getServer();
71 private final GetContentFieldsV getContentFieldsV = new GetContentFieldsV();
72 private static Ingester instance;
73
74 //for ingesting chunk as SolrInputDocument (non-content-streaming, by-pass tika)
75 //TODO use a streaming way to add content to /update handler
76 private static final int MAX_DOC_CHUNK_SIZE = 1024*1024;
77 private static final String docContentEncoding = "UTF-8"; //NON-NLS
78
79
80 private Ingester() {
81 }
82
83 public static synchronized Ingester getDefault() {
84 if (instance == null) {
85 instance = new Ingester();
86 }
87 return instance;
88 }
89
90 @Override
91 @SuppressWarnings("FinalizeDeclaration")
92 protected void finalize() throws Throwable {
93 super.finalize();
94
95 // Warn if files might have been left uncommited.
96 if (uncommitedIngests) {
97 logger.warning("Ingester was used to add files that it never committed."); //NON-NLS
98 }
99 }
100
109 void ingest(AbstractFileStringContentStream afscs) throws IngesterException {
110 Map<String, String> params = getContentFields(afscs.getSourceContent());
111 ingest(afscs, params, afscs.getSourceContent().getSize());
112 }
113
125 void ingest(TextExtractor fe) throws IngesterException {
126 Map<String, String> params = getContentFields(fe.getSourceFile());
127
128 params.put(Server.Schema.NUM_CHUNKS.toString(), Integer.toString(fe.getNumChunks()));
129
130 ingest(new NullContentStream(fe.getSourceFile()), params, 0);
131 }
132
144 void ingest(AbstractFileChunk fec, ByteContentStream bcs, int size) throws IngesterException {
145 AbstractContent sourceContent = bcs.getSourceContent();
146 Map<String, String> params = getContentFields(sourceContent);
147
148 //overwrite id with the chunk id
149 params.put(Server.Schema.ID.toString(),
150 Server.getChunkIdString(sourceContent.getId(), fec.getChunkId()));
151
152 ingest(bcs, params, size);
153 }
154
167 void ingest(AbstractFile file, boolean ingestContent) throws IngesterException {
168 if (ingestContent == false || file.isDir()) {
169 ingest(new NullContentStream(file), getContentFields(file), 0);
170 } else {
171 ingest(new FscContentStream(file), getContentFields(file), file.getSize());
172 }
173 }
174
181 private Map<String, String> getContentFields(AbstractContent fsc) {
182 return fsc.accept(getContentFieldsV);
183 }
184
189
190 @Override
192 return new HashMap<String, String>();
193 }
194
195 @Override
199 return params;
200 }
201
202 @Override
206 return params;
207 }
208
209 @Override
213 return params;
214 }
215
216 @Override
218 // layout files do not have times
220 }
221
222 @Override
226 return params;
227 }
228
234 return params;
235 }
236
237
239 Map<String, String> params = new HashMap<String, String>();
241 long dataSourceId = -1;
242 try {
244 params.put(
Server.
Schema.IMAGE_ID.toString(), Long.toString(dataSourceId));
246 logger.log(Level.SEVERE,
"Could not get data source id to properly index the file " + af.
getId());
//NON-NLS
247 params.put(
Server.
Schema.IMAGE_ID.toString(), Long.toString(-1));
248 }
249
251 return params;
252 }
253 }
254
255
271 void ingest(ContentStream cs, Map<String, String> fields, final long size) throws IngesterException {
272
273 if (fields.get(
Server.
Schema.IMAGE_ID.toString()) == null) {
274 //skip the file, image id unknown
275 String msg = NbBundle.getMessage(this.getClass(),
276 "Ingester.ingest.exception.unknownImgId.msg", cs.getName());
277 logger.log(Level.SEVERE, msg);
278 throw new IngesterException(msg);
279 }
280
281 final byte[] docChunkContentBuf = new byte[MAX_DOC_CHUNK_SIZE];
282 SolrInputDocument updateDoc = new SolrInputDocument();
283
284 for (String key : fields.keySet()) {
285 updateDoc.addField(key, fields.get(key));
286 }
287
288 //using size here, but we are no longer ingesting entire files
289 //size is normally a chunk size, up to 1MB
290
291 if (size > 0) {
292
293 InputStream is = null;
294 int read = 0;
295 try {
296 is = cs.getStream();
297 read = is.read(docChunkContentBuf);
298 } catch (IOException ex) {
299 throw new IngesterException(
300 NbBundle.getMessage(this.getClass(), "Ingester.ingest.exception.cantReadStream.msg",
301 cs.getName()));
302 } finally {
303 try {
304 is.close();
305 } catch (IOException ex) {
306 logger.log(Level.WARNING, "Could not close input stream after reading content, " + cs.getName(), ex); //NON-NLS
307 }
308 }
309
310 if (read != 0) {
311 String s = "";
312 try {
313 s = new String(docChunkContentBuf, 0, read, docContentEncoding);
314 } catch (UnsupportedEncodingException ex) {
315 Exceptions.printStackTrace(ex);
316 }
317 updateDoc.addField(Server.Schema.CONTENT.toString(), s);
318 } else {
319 updateDoc.addField(Server.Schema.CONTENT.toString(), "");
320 }
321 }
322 else {
323 //no content, such as case when 0th chunk indexed
324 updateDoc.addField(Server.Schema.CONTENT.toString(), "");
325 }
326
327
328 try {
329 //TODO consider timeout thread, or vary socket timeout based on size of indexed content
330 solrServer.addDocument(updateDoc);
331 uncommitedIngests = true;
332 } catch (KeywordSearchModuleException ex) {
333 throw new IngesterException(
334 NbBundle.getMessage(this.getClass(), "Ingester.ingest.exception.err.msg", cs.getName()), ex);
335 }
336
337
338 }
339
352 private void ingestExtract(ContentStream cs, Map<String, String> fields, final long size) throws IngesterException {
353 final ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update/extract"); //NON-NLS
354 up.addContentStream(cs);
355 setFields(up, fields);
356 up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
357
358 final String contentType = cs.getContentType();
359 if (contentType != null && !contentType.trim().equals("")) {
360 up.setParam("stream.contentType", contentType); //NON-NLS
361 }
362
363 //logger.log(Level.INFO, "Ingesting " + fields.get("file_name"));
364 up.setParam("commit", "false"); //NON-NLS
365
366 final Future<?> f = upRequestExecutor.submit(new UpRequestTask(up));
367
368 try {
369 f.get(getTimeout(size), TimeUnit.SECONDS);
370 } catch (TimeoutException te) {
371 logger.log(Level.WARNING, "Solr timeout encountered, trying to restart Solr"); //NON-NLS
372 //restart may be needed to recover from some error conditions
373 hardSolrRestart();
374 throw new IngesterException(
375 NbBundle.getMessage(this.getClass(), "Ingester.ingestExtract.exception.solrTimeout.msg",
376 fields.get("id"), fields.get("file_name"))); //NON-NLS
377 } catch (Exception e) {
378 throw new IngesterException(
379 NbBundle.getMessage(this.getClass(), "Ingester.ingestExtract.exception.probPostToSolr.msg",
380 fields.get("id"), fields.get("file_name")), e); //NON-NLS
381 }
382 uncommitedIngests = true;
383 }
384
388 private void hardSolrRestart() {
389 try {
390 solrServer.closeCore();
391 } catch (KeywordSearchModuleException ex) {
392 logger.log(Level.WARNING, "Cannot close core", ex); //NON-NLS
393 }
394
395 solrServer.stop();
396
397 try {
398 solrServer.start();
399 } catch (KeywordSearchModuleException ex) {
400 logger.log(Level.WARNING, "Cannot start", ex); //NON-NLS
401 } catch (SolrServerNoPortException ex) {
402 logger.log(Level.WARNING, "Cannot start server with this port", ex); //NON-NLS
403 }
404
405 try {
406 solrServer.openCore();
407 } catch (KeywordSearchModuleException ex) {
408 logger.log(Level.WARNING, "Cannot open core", ex); //NON-NLS
409 }
410 }
411
418 static int getTimeout(long size) {
419 if (size < 1024 * 1024L) //1MB
420 {
421 return 60;
422 } else if (size < 10 * 1024 * 1024L) //10MB
423 {
424 return 1200;
425 } else if (size < 100 * 1024 * 1024L) //100MB
426 {
427 return 3600;
428 } else {
429 return 3 * 3600;
430 }
431
432 }
433
435
436 ContentStreamUpdateRequest up;
437
439 this.up = up;
440 }
441
442 @Override
444 try {
445 up.setMethod(METHOD.POST);
446 solrServer.request(up);
448 throw new RuntimeException(
449 NbBundle.getMessage(this.getClass(), "Ingester.UpReqestTask.run.exception.sorlNotAvail.msg"), ex);
450 } catch (IllegalStateException ex) {
451 // problems with content
452 throw new RuntimeException(
453 NbBundle.getMessage(this.getClass(), "Ingester.UpRequestTask.run.exception.probReadFile.msg"), ex);
454 } catch (SolrServerException ex) {
455 // If there's a problem talking to Solr, something is fundamentally
456 // wrong with ingest
457 throw new RuntimeException(
458 NbBundle.getMessage(this.getClass(), "Ingester.UpRequestTask.run.exception.solrProb.msg"), ex);
459 } catch (SolrException ex) {
460 // Tika problems result in an unchecked SolrException
461 ErrorCode ec = ErrorCode.getErrorCode(ex.code());
462
463 // When Tika has problems with a document, it throws a server error
464 // but it's okay to continue with other documents
465 if (ec.equals(ErrorCode.SERVER_ERROR)) {
466 throw new RuntimeException(NbBundle.getMessage(this.getClass(),
467 "Ingester.UpRequestTask.run.exception.probPostToSolr.msg",
468 ec),
469 ex);
470 } else {
471 // shouldn't get any other error codes
472 throw ex;
473 }
474 }
475
476 }
477 }
478
483 void commit() {
484 try {
485 solrServer.commit();
486 uncommitedIngests = false;
487 } catch (NoOpenCoreException ex) {
488 logger.log(Level.WARNING, "Error commiting index", ex); //NON-NLS
489 } catch (SolrServerException ex) {
490 logger.log(Level.WARNING, "Error commiting index", ex); //NON-NLS
491 }
492 }
493
500 private static void setFields(ContentStreamUpdateRequest up, Map<String, String> fields) {
501 for (Entry<String, String> field : fields.entrySet()) {
502 up.setParam("literal." + field.getKey(), field.getValue()); //NON-NLS
503 }
504 }
505
510
512
515 }
516
517 @Override
520 }
521
522 @Override
524 return NbBundle.getMessage(this.getClass(),
"Ingester.FscContentStream.getSrcInfo", f.
getId());
525 }
526
527 @Override
529 return null;
530 }
531
532 @Override
535 }
536
537 @Override
540 }
541
542 @Override
544 throw new UnsupportedOperationException(
545 NbBundle.getMessage(this.getClass(), "Ingester.FscContentStream.getReader"));
546 }
547 }
548
553
555
557 this.aContent = aContent;
558 }
559
560 @Override
563 }
564
565 @Override
567 return NbBundle.getMessage(this.getClass(),
"Ingester.NullContentStream.getSrcInfo.text", aContent.
getId());
568 }
569
570 @Override
572 return null;
573 }
574
575 @Override
577 return 0L;
578 }
579
580 @Override
582 return new ByteArrayInputStream(new byte[0]);
583 }
584
585 @Override
587 throw new UnsupportedOperationException(
588 NbBundle.getMessage(this.getClass(), "Ingester.NullContentStream.getReader"));
589 }
590 }
591
596 static class IngesterException extends Exception {
597
598 IngesterException(String message, Throwable ex) {
599 super(message, ex);
600 }
601
602 IngesterException(String message) {
603 super(message);
604 }
605 }
606 }
Map< String, String > visit(Directory d)
Map< String, String > defaultVisit(Content cntnt)
Map< String, String > visit(DerivedFile df)
static String getStringTimeISO8601(long epochSeconds, TimeZone tzone)
Map< String, String > visit(File f)
Map< String, String > getCommonFields(AbstractFile af)
Map< String, String > getCommonFileContentFields(Map< String, String > params, AbstractFile file)
Map< String, String > visit(LocalFile lf)
Map< String, String > visit(LayoutFile lf)