# This file is part of Pimlico
# Copyright (C) 2016 Mark Granroth-Wilding
# Licensed under the GNU GPL v3.0 - http://www.gnu.org/licenses/gpl-3.0.en.html
"""
Input datatype for extracting documents from XML files. Gigaword, for example, is stored in this way.
Depends on BeautifulSoup (see "bs4" target in lib dir Makefile).
DEPRECATED: Use input module :mod:`pimlico.modules.input.xml` instead. Input datatypes are being phased out.
"""
import gzip
from gzip import GzipFile
import os
from multiprocessing import Pool
import time
from pimlico.core.dependencies.python import PythonPackageOnPip
from pimlico.core.modules.options import comma_separated_strings
from pimlico.datatypes.base import IterableCorpus, PimlicoDatatypeWriter
from pimlico.datatypes.documents import RawTextDocumentType
from pimlico.utils.filesystem import retry_open
from pimlico.utils.progress import get_progress_bar
__all__ = ["XmlDocumentIterator"]
[docs]class XmlDocumentIterator(IterableCorpus):
requires_data_preparation = True
input_module_options = {
"path": {
"required": True,
"help": "Path to the data",
},
"document_node_type": {
"help": "XML node type to extract documents from (default: 'doc')",
"default": "doc",
},
"document_name_attr": {
"help": "Attribute of document nodes to get document name from (default: 'id')",
"default": "id",
},
"filter_on_doc_attr": {
"help": "Comma-separated list of key=value constraints. If given, only docs with the attribute 'key' on "
"their doc node and the attribute value 'value' will be included",
"type": comma_separated_strings,
},
"truncate": {
"help": "Stop reading once we've got this number of documents",
"type": int,
},
}
data_point_type = RawTextDocumentType
def __init__(self, *args, **kwargs):
super(XmlDocumentIterator, self).__init__(*args, **kwargs)
self.attr_constraints = dict(key_val.split("=") for key_val in self.options["filter_on_doc_attr"]) \
if self.options.get("filter_on_doc_attr", None) is not None else None
def __iter__(self):
path = self.options["path"]
if not os.path.isdir(path):
# Just a single file
files = [(os.path.dirname(os.path.abspath(path)), os.path.basename(path))]
else:
# Directory: iterate over all files
files = [
(dirname, filename)
for (dirname, dirs, filenames) in os.walk(path)
for filename in filenames
]
n_docs = 0
try:
for dirname, filename in files:
path = os.path.join(dirname, filename)
for doc_node in get_doc_nodes(path, self.options["document_node_type"], self.attr_constraints):
# The node should supply us with the document name, using the attribute name specified
doc_name = doc_node.get(self.options["document_name_attr"])
# Pull the text out of the document node
doc_text = doc_node.text
yield doc_name, doc_text
n_docs += 1
if self.options["truncate"] is not None and n_docs >= self.options["truncate"]:
raise TruncateNow()
except TruncateNow:
pass
[docs] def get_software_dependencies(self):
return super(XmlDocumentIterator, self).get_software_dependencies() + [
BeautifulSoupDependency()
]
[docs] def prepare_data(self, output_dir, log):
path = self.options["path"]
# Count the number of documents in the corpus
# Iterate over the corpus in a similar way to the datatype that reads documents, but just count the DOC tags
if not os.path.isdir(path):
# Just a single file
files = [os.path.abspath(path)]
else:
# Directory: iterate over all files
files = [
os.path.join(dirname, filename)
for (dirname, dirs, filenames) in os.walk(os.path.abspath(path))
for filename in filenames
]
if self.pipeline.processes > 1:
processes_message = ", using %d parallel processes" % self.pipeline.processes
else:
processes_message = ""
log.info("Counting docs in %s files%s" % (len(files), processes_message))
num_docs = count_files_parallel(
files, self.options["truncate"], self.options["document_node_type"],
self.pipeline.processes, self.attr_constraints
)
log.info("Counted %d docs" % num_docs)
with PimlicoDatatypeWriter(output_dir) as datatype:
datatype.metadata["length"] = num_docs
[docs] def data_ready(self):
# Check that the input path exists and that we've prepared the data to get the length
return super(XmlDocumentIterator, self).data_ready() and \
os.path.exists(self.options["path"]) and \
"length" in self.metadata
def count_files_parallel(filenames, truncate, document_node_type, processes, attr_constraints):
if truncate is not None:
# Truncating, so update pbar with num docs and stop after we've got enough
pbar = get_progress_bar(truncate, title="Counting documents")
else:
# Otherwise, don't know how many docs there are (that's the whole point) so count input files
pbar = get_progress_bar(len(filenames), title="Counting documents")
pool = Pool(processes=processes)
jobs = []
for filename in filenames:
# Set a job going to count the docs in this file
jobs.append(pool.apply_async(count_files_process, args=(filename, document_node_type, attr_constraints)))
pool.close()
count = counted_files = 0
# Wait for results to come through
while counted_files < len(filenames):
# Wait a bit and check again
time.sleep(1.)
completed_jobs = []
for job in jobs:
if job.ready():
# This will raise any except that occurred in the process
new_count = job.get()
# Stop checking this job now
completed_jobs.append(job)
# Update our counts
counted_files += 1
count += new_count
if truncate is not None:
if count >= truncate:
# We've got at least as many docs as requested, so we'll stop after that number when iterating
pbar.finish()
# Stop all workers
pool.terminate()
pbar.finish()
return truncate
else:
pbar.update(count)
else:
pbar.update(counted_files)
# Stop checking on the completed jobs
for job in completed_jobs:
jobs.remove(job)
pbar.finish()
# Double-check that everything's finished
pool.join()
return count
def count_files_process(filename, document_node_type, attr_constraints):
return len(get_doc_nodes(filename, document_node_type, attr_constraints))
def get_doc_nodes(filename, document_node_type, attr_constraints):
if attr_constraints is None:
attr_constraints = {}
safe_import_bs4()
from bs4 import BeautifulSoup
if filename.endswith(".gz"):
# Permit gzip files by opening them using the gzip library
with GzipFile(filename, fileobj=retry_open(filename, mode="rb")) as f:
data = f.read()
else:
with retry_open(filename, mode="r") as f:
data = f.read()
# Read the XML using Beautiful Soup, so we can handle messy XML in a tolerant fashion
soup = BeautifulSoup(data, "lxml")
# Look for the type of XML node that documents are stored in and count them
return soup.find_all(document_node_type, attrs=attr_constraints)
def safe_import_bs4():
# BS can go very slowly if it tries to use chardet to detect input encoding
# Remove chardet and cchardet from the Python modules, so that import fails and it doesn't try to use them
# This prevents it getting stuck on reading long input files
import sys
sys.modules["cchardet"] = None
sys.modules["chardet"] = None
# Now we can import BS
import bs4
return bs4
class TruncateNow(StopIteration):
pass
class BeautifulSoupDependency(PythonPackageOnPip):
"""
Test import with special BS import behaviour.
"""
def __init__(self):
super(BeautifulSoupDependency, self).__init__("bs4", pip_package="beautifulsoup4")
def import_package(self):
return safe_import_bs4()