Source code for pimlico.datatypes.xml

# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html

"""
Input datatype for extracting documents from XML files. Gigaword, for example, is stored in this way.

Depends on BeautifulSoup (see "bs4" target in lib dir Makefile).

DEPRECATED: Use input module :mod:`pimlico.modules.input.xml` instead. Input datatypes are being phased out.

"""
import gzip
from gzip import GzipFile

import os
from multiprocessing import Pool

import time

from pimlico.core.dependencies.python import PythonPackageOnPip
from pimlico.core.modules.options import comma_separated_strings
from pimlico.datatypes.base import IterableCorpus, PimlicoDatatypeWriter
from pimlico.datatypes.documents import RawTextDocumentType
from pimlico.utils.filesystem import retry_open
from pimlico.utils.progress import get_progress_bar


__all__ = ["XmlDocumentIterator"]


[docs]class XmlDocumentIterator(IterableCorpus): requires_data_preparation = True input_module_options = { "path": { "required": True, "help": "Path to the data", }, "document_node_type": { "help": "XML node type to extract documents from (default: 'doc')", "default": "doc", }, "document_name_attr": { "help": "Attribute of document nodes to get document name from (default: 'id')", "default": "id", }, "filter_on_doc_attr": { "help": "Comma-separated list of key=value constraints. If given, only docs with the attribute 'key' on " "their doc node and the attribute value 'value' will be included", "type": comma_separated_strings, }, "truncate": { "help": "Stop reading once we've got this number of documents", "type": int, }, } data_point_type = RawTextDocumentType def __init__(self, *args, **kwargs): super(XmlDocumentIterator, self).__init__(*args, **kwargs) self.attr_constraints = dict(key_val.split("=") for key_val in self.options["filter_on_doc_attr"]) \ if self.options.get("filter_on_doc_attr", None) is not None else None def __iter__(self): path = self.options["path"] if not os.path.isdir(path): # Just a single file files = [(os.path.dirname(os.path.abspath(path)), os.path.basename(path))] else: # Directory: iterate over all files files = [ (dirname, filename) for (dirname, dirs, filenames) in os.walk(path) for filename in filenames ] n_docs = 0 try: for dirname, filename in files: path = os.path.join(dirname, filename) for doc_node in get_doc_nodes(path, self.options["document_node_type"], self.attr_constraints): # The node should supply us with the document name, using the attribute name specified doc_name = doc_node.get(self.options["document_name_attr"]) # Pull the text out of the document node doc_text = doc_node.text yield doc_name, doc_text n_docs += 1 if self.options["truncate"] is not None and n_docs >= self.options["truncate"]: raise TruncateNow() except TruncateNow: pass
[docs] def get_software_dependencies(self): return super(XmlDocumentIterator, self).get_software_dependencies() + [ BeautifulSoupDependency() ]
[docs] def prepare_data(self, output_dir, log): path = self.options["path"] # Count the number of documents in the corpus # Iterate over the corpus in a similar way to the datatype that reads documents, but just count the DOC tags if not os.path.isdir(path): # Just a single file files = [os.path.abspath(path)] else: # Directory: iterate over all files files = [ os.path.join(dirname, filename) for (dirname, dirs, filenames) in os.walk(os.path.abspath(path)) for filename in filenames ] if self.pipeline.processes > 1: processes_message = ", using %d parallel processes" % self.pipeline.processes else: processes_message = "" log.info("Counting docs in %s files%s" % (len(files), processes_message)) num_docs = count_files_parallel( files, self.options["truncate"], self.options["document_node_type"], self.pipeline.processes, self.attr_constraints ) log.info("Counted %d docs" % num_docs) with PimlicoDatatypeWriter(output_dir) as datatype: datatype.metadata["length"] = num_docs
[docs] def data_ready(self): # Check that the input path exists and that we've prepared the data to get the length return super(XmlDocumentIterator, self).data_ready() and \ os.path.exists(self.options["path"]) and \ "length" in self.metadata
def count_files_parallel(filenames, truncate, document_node_type, processes, attr_constraints): if truncate is not None: # Truncating, so update pbar with num docs and stop after we've got enough pbar = get_progress_bar(truncate, title="Counting documents") else: # Otherwise, don't know how many docs there are (that's the whole point) so count input files pbar = get_progress_bar(len(filenames), title="Counting documents") pool = Pool(processes=processes) jobs = [] for filename in filenames: # Set a job going to count the docs in this file jobs.append(pool.apply_async(count_files_process, args=(filename, document_node_type, attr_constraints))) pool.close() count = counted_files = 0 # Wait for results to come through while counted_files < len(filenames): # Wait a bit and check again time.sleep(1.) completed_jobs = [] for job in jobs: if job.ready(): # This will raise any except that occurred in the process new_count = job.get() # Stop checking this job now completed_jobs.append(job) # Update our counts counted_files += 1 count += new_count if truncate is not None: if count >= truncate: # We've got at least as many docs as requested, so we'll stop after that number when iterating pbar.finish() # Stop all workers pool.terminate() pbar.finish() return truncate else: pbar.update(count) else: pbar.update(counted_files) # Stop checking on the completed jobs for job in completed_jobs: jobs.remove(job) pbar.finish() # Double-check that everything's finished pool.join() return count def count_files_process(filename, document_node_type, attr_constraints): return len(get_doc_nodes(filename, document_node_type, attr_constraints)) def get_doc_nodes(filename, document_node_type, attr_constraints): if attr_constraints is None: attr_constraints = {} safe_import_bs4() from bs4 import BeautifulSoup if filename.endswith(".gz"): # Permit gzip files by opening them using the gzip library with GzipFile(filename, fileobj=retry_open(filename, mode="rb")) as f: data = f.read() else: with retry_open(filename, mode="r") as f: data = f.read() # Read the XML using Beautiful Soup, so we can handle messy XML in a tolerant fashion soup = BeautifulSoup(data, "lxml") # Look for the type of XML node that documents are stored in and count them return soup.find_all(document_node_type, attrs=attr_constraints) def safe_import_bs4(): # BS can go very slowly if it tries to use chardet to detect input encoding # Remove chardet and cchardet from the Python modules, so that import fails and it doesn't try to use them # This prevents it getting stuck on reading long input files import sys sys.modules["cchardet"] = None sys.modules["chardet"] = None # Now we can import BS import bs4 return bs4 class TruncateNow(StopIteration): pass class BeautifulSoupDependency(PythonPackageOnPip): """ Test import with special BS import behaviour. """ def __init__(self): super(BeautifulSoupDependency, self).__init__("bs4", pip_package="beautifulsoup4") def import_package(self): return safe_import_bs4()