# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html
"""
Input module for extracting documents from XML files. Gigaword, for example, is stored in this way.
Depends on BeautifulSoup (see "bs4" target in lib dir Makefile).
"""
import gzip
from gzip import GzipFile
import os
from multiprocessing import Pool
import time
from pimlico.core.dependencies.python import PythonPackageOnPip
from pimlico.core.modules.options import comma_separated_strings
from pimlico.datatypes.base import IterableCorpus, PimlicoDatatypeWriter
from pimlico.datatypes.documents import RawTextDocumentType
from pimlico.utils.filesystem import retry_open
from pimlico.utils.progress import get_progress_bar
__all__ = ["XmlDocumentIterator"]
[docs]class XmlDocumentIterator(IterableCorpus):
requires_data_preparation = True
input_module_options = {
"path": {
"required": True,
"help": "Path to the data",
},
"document_node_type": {
"help": "XML node type to extract documents from (default: 'doc')",
"default": "doc",
},
"document_name_attr": {
"help": "Attribute of document nodes to get document name from (default: 'id')",
"default": "id",
},
"filter_on_doc_attr": {
"help": "Comma-separated list of key=value constraints. If given, only docs with the attribute 'key' on "
"their doc node and the attribute value 'value' will be included",
"type": comma_separated_strings,
},
"truncate": {
"help": "Stop reading once we've got this number of documents",
"type": int,
},
}
data_point_type = RawTextDocumentType
def __init__(self, *args, **kwargs):
super(XmlDocumentIterator, self).__init__(*args, **kwargs)
self.attr_constraints = dict(key_val.split("=") for key_val in self.options["filter_on_doc_attr"]) \
if self.options.get("filter_on_doc_attr", None) is not None else None
def __iter__(self):
path = self.options["path"]
if not os.path.isdir(path):
# Just a single file
files = [(os.path.dirname(os.path.abspath(path)), os.path.basename(path))]
else:
# Directory: iterate over all files
files = [
(dirname, filename)
for (dirname, dirs, filenames) in os.walk(path)
for filename in filenames
]
n_docs = 0
try:
for dirname, filename in files:
path = os.path.join(dirname, filename)
for doc_node in get_doc_nodes(path, self.options["document_node_type"], self.attr_constraints):
# The node should supply us with the document name, using the attribute name specified
doc_name = doc_node.get(self.options["document_name_attr"])
# Pull the text out of the document node
doc_text = doc_node.text
yield doc_name, doc_text
n_docs += 1
if self.options["truncate"] is not None and n_docs >= self.options["truncate"]:
raise TruncateNow()
except TruncateNow:
pass
[docs] def get_software_dependencies(self):
return super(XmlDocumentIterator, self).get_software_dependencies() + [
BeautifulSoupDependency()
]
[docs] def prepare_data(self, output_dir, log):
path = self.options["path"]
# Count the number of documents in the corpus
# Iterate over the corpus in a similar way to the datatype that reads documents, but just count the DOC tags
if not os.path.isdir(path):
# Just a single file
files = [os.path.abspath(path)]
else:
# Directory: iterate over all files
files = [
os.path.join(dirname, filename)
for (dirname, dirs, filenames) in os.walk(os.path.abspath(path))
for filename in filenames
]
if self.pipeline.processes > 1:
processes_message = ", using %d parallel processes" % self.pipeline.processes
else:
processes_message = ""
log.info("Counting docs in %s files%s" % (len(files), processes_message))
num_docs = count_files_parallel(
files, self.options["truncate"], self.options["document_node_type"],
self.pipeline.processes, self.attr_constraints
)
log.info("Counted %d docs" % num_docs)
with PimlicoDatatypeWriter(output_dir) as datatype:
datatype.metadata["length"] = num_docs
[docs] def data_ready(self):
# Check that the input path exists and that we've prepared the data to get the length
return super(XmlDocumentIterator, self).data_ready() and \
os.path.exists(self.options["path"]) and \
"length" in self.metadata
def count_files_parallel(filenames, truncate, document_node_type, processes, attr_constraints):
if truncate is not None:
# Truncating, so update pbar with num docs and stop after we've got enough
pbar = get_progress_bar(truncate, title="Counting documents")
else:
# Otherwise, don't know how many docs there are (that's the whole point) so count input files
pbar = get_progress_bar(len(filenames), title="Counting documents")
pool = Pool(processes=processes)
jobs = []
for filename in filenames:
# Set a job going to count the docs in this file
jobs.append(pool.apply_async(count_files_process, args=(filename, document_node_type, attr_constraints)))
pool.close()
count = counted_files = 0
# Wait for results to come through
while counted_files < len(filenames):
# Wait a bit and check again
time.sleep(1.)
completed_jobs = []
for job in jobs:
if job.ready():
# This will raise any except that occurred in the process
new_count = job.get()
# Stop checking this job now
completed_jobs.append(job)
# Update our counts
counted_files += 1
count += new_count
if truncate is not None:
if count >= truncate:
# We've got at least as many docs as requested, so we'll stop after that number when iterating
pbar.finish()
# Stop all workers
pool.terminate()
pbar.finish()
return truncate
else:
pbar.update(count)
else:
pbar.update(counted_files)
# Stop checking on the completed jobs
for job in completed_jobs:
jobs.remove(job)
pbar.finish()
# Double-check that everything's finished
pool.join()
return count
def count_files_process(filename, document_node_type, attr_constraints):
return len(get_doc_nodes(filename, document_node_type, attr_constraints))
def get_doc_nodes(filename, document_node_type, attr_constraints):
if attr_constraints is None:
attr_constraints = {}
safe_import_bs4()
from bs4 import BeautifulSoup
if filename.endswith(".gz"):
# Permit gzip files by opening them using the gzip library
with GzipFile(filename, fileobj=retry_open(filename, mode="rb")) as f:
data = f.read()
else:
with retry_open(filename, mode="r") as f:
data = f.read()
# Read the XML using Beautiful Soup, so we can handle messy XML in a tolerant fashion
soup = BeautifulSoup(data, "lxml")
# Look for the type of XML node that documents are stored in and count them
return soup.find_all(document_node_type, attrs=attr_constraints)
def safe_import_bs4():
# BS can go very slowly if it tries to use chardet to detect input encoding
# Remove chardet and cchardet from the Python modules, so that import fails and it doesn't try to use them
# This prevents it getting stuck on reading long input files
import sys
sys.modules["cchardet"] = None
sys.modules["chardet"] = None
# Now we can import BS
import bs4
return bs4
class TruncateNow(StopIteration):
pass
class BeautifulSoupDependency(PythonPackageOnPip):
"""
Test import with special BS import behaviour.
"""
def __init__(self):
super(BeautifulSoupDependency, self).__init__("bs4", pip_package="beautifulsoup4")
def import_package(self):
return safe_import_bs4()