# This file is part of Pimlico
# Copyright (C) 2020 Mark Granroth-Wilding
# Licensed under the GNU LGPL v3.0 - https://www.gnu.org/licenses/lgpl-3.0.en.html
"""
File collections and files.
There used to be an UnnamedFileCollection, which has been removed in the move to the
new datatype system. It used to be used mostly for input datatypes, which don't exist
any more. There may still be a use for this, though, so I may be added in future.
"""
import io
import os
from collections import OrderedDict
from builtins import object
from pimlico.core.modules.options import comma_separated_strings
from pimlico.datatypes import PimlicoDatatype, DynamicInputDatatypeRequirement
from pimlico.utils.core import cached_property
__all__ = ["NamedFileCollection", "NamedFile", "FilesInput", "FileInput", "TextFile"]
[docs]class NamedFileCollection(PimlicoDatatype):
"""
Datatypes that stores a fixed collection of files, which have fixed names
(or at least names that can be determined from the class). Very many datatypes fall into this category.
Overriding this base class provides them with some common functionality, including the possibility of
creating a union of multiple datatypes.
The datatype option ``filenames`` should specify a list of filenames contained by the datatype.
For typechecking, the provided type must have at least all the filenames of the type requirement,
though it may include more.
All files are contained in the datatypes data directory. If files are stored in subdirectories, this may
be specified in the list of filenames using ``/`` s. (Always use forward slashes, regardless of the operating
system.)
"""
datatype_name = "named_file_collection"
datatype_options = OrderedDict([
("filenames", {
"type": comma_separated_strings,
"help": "Filenames contained in the collection",
"default": [],
})
])
datatype_supports_python2 = True
def __init__(self, *args, **kwargs):
super(NamedFileCollection, self).__init__(*args, **kwargs)
self.filenames = self.options["filenames"]
[docs] def check_type(self, supplied_type):
if not super(NamedFileCollection, self).check_type(supplied_type):
return False
# Additionally check the filenames
# All of our filenames should be included in the supplied type
# If there are others too, that's fine
for reqd_fn in self.filenames:
if reqd_fn not in supplied_type.filenames:
return False
return True
[docs] def browse_file(self, reader, filename):
"""
Return text for a particular file in the collection to show in the
browser. By default, just reads in the file's data and returns
it, but subclasses might want to override this (perhaps conditioned
on the filename) to format the data readably.
:param reader:
:param filename:
:return: file data to show
"""
return reader.read_file(filename)
[docs] def run_browser(self, reader, opts):
"""
All NamedFileCollections provide a browser that just lets you see a
list of the files and view them, in the case of text files.
Subclasses may override the way individual files are shown by
overriding `browse_file()`.
"""
from pimlico.cli.browser.tools.files import browse_files
browse_files(reader)
[docs] class Reader(object):
[docs] class Setup(object):
[docs] def get_required_paths(self):
# Split on /s, so we use the filesystem's appropriate joiner for paths
# Just returns the paths relative to the data dir: the base setup will handle checking for them
return super(NamedFileCollection.Reader.Setup, self).get_required_paths() + [
os.path.join(*filename.split("/")) for filename in self.datatype.filenames
]
[docs] def process_setup(self):
super(NamedFileCollection.Reader, self).process_setup()
self.filenames = self.datatype.filenames
[docs] def get_absolute_path(self, filename):
if filename not in self.filenames:
raise ValueError("'{}' is not a filename in the file collection".format(filename))
return os.path.join(self.data_dir, filename)
@cached_property
def absolute_paths(self):
return [self.get_absolute_path(f) for f in self.filenames]
@property
def absolute_filenames(self):
""" For backwards compatibility: use absolute_paths by preference """
return self.absolute_paths
[docs] def read_file(self, filename=None, mode="r", text=False):
"""
Read a file from the collection.
:param filename: string filename, which should be one of the filenames specified for this
collection; or an integer, in which case the ith file in the collection is read. If
not given, the first file is read
:param mode:
:param text: if True, the file is treated as utf-8-encoded text and a unicode object is
returned. Otherwise, a bytes object is returned.
:return:
"""
with self.open_file(filename, mode=mode) as f:
data = f.read()
if text:
return data.decode("utf-8")
else:
return data
[docs] def read_files(self, mode="r", text=False):
return [self.read_file(f, mode=mode, text=text) for f in self.filenames]
[docs] def open_file(self, filename=None, mode="r"):
# By default, read the first file in the collection
if filename is None:
filename = self.filenames[0]
elif type(filename) is int:
# Allow an int to specify the ith filename
filename = self.filenames[filename]
return OpenFileReader(self, filename, mode=mode)
[docs] class Writer(object):
def __init__(self, *args, **kwargs):
super(NamedFileCollection.Writer, self).__init__(*args, **kwargs)
self.filenames = self.datatype.filenames
# Make sure every file gets written
for filename in self.filenames:
self.require_tasks("write_%s" % filename)
[docs] def write_file(self, filename, data, text=False):
"""
If text=True, the data is expected to be unicode and is encoded as utf-8.
Otherwise, data should be a bytes object.
"""
# The filename could contain a subdirectory, so check the dir exists
file_dir = os.path.dirname(self.get_absolute_path(filename))
if not os.path.exists(file_dir):
os.makedirs(file_dir)
if text:
data = data.encode("utf-8")
# Write the file
with self.open_file(filename) as f:
f.write(data)
[docs] def file_written(self, filename):
""" Mark the given file as having been written, if write_file() was not used to write it. """
self.task_complete("write_%s" % filename)
[docs] def open_file(self, filename=None):
if filename is None:
filename = self.filenames[0]
return OpenFileWriter(self, filename)
[docs] def get_absolute_path(self, filename=None):
if filename is None:
filename = self.filenames[0]
elif filename not in self.filenames:
raise ValueError("'{}' is not a filename in the file collection".format(filename))
return os.path.join(self.data_dir, filename)
@cached_property
def absolute_paths(self):
return [self.get_absolute_path(f) for f in self.filenames]
class OpenFileWriter(io.FileIO):
def __init__(self, writer, filename):
self.writer = writer
self.filename = filename
super(OpenFileWriter, self).__init__(self.writer.get_absolute_path(self.filename), "w")
def __exit__(self, exc_type, exc_val, exc_tb):
super(OpenFileWriter, self).__exit__(exc_type, exc_val, exc_tb)
if exc_type is None:
self.writer.file_written(self.filename)
class OpenFileReader(io.FileIO):
def __init__(self, reader, filename, mode="r"):
self.reader = reader
self.filename = filename
super(OpenFileReader, self).__init__(self.reader.get_absolute_path(self.filename), mode=mode)
[docs]class NamedFile(NamedFileCollection):
"""
Like NamedFileCollection, but always has exactly one file.
The filename is given as the `filename` datatype option, which can also be given
as the first init arg: `NamedFile("myfile.txt")`.
Since NamedFile is a subtype of NamedFileCollection, it also has a "filenames" option.
It is ignored if the `filename` option is given, and otherwise must have exactly one
item.
"""
datatype_name = "named_file"
datatype_options = OrderedDict([
("filename", {
"help": "The file's name",
})
] + list(NamedFileCollection.datatype_options.items()))
datatype_supports_python2 = True
def __init__(self, *args, **kwargs):
super(NamedFile, self).__init__(*args, filenames=[], **kwargs)
self.filename = self.options["filename"]
if self.filename is None:
# Allow the "filenames" option to be used as well
if len(self.filenames) > 1:
raise ValueError("tried to instantiate NamedFile with multiple filenames in the 'filename' option")
elif len(self.filenames) == 1:
self.filename = self.filenames[0]
else:
# Use a default filename if none is given
self.filename = "data"
# Set filenames from our filename
self.filenames = [self.filename]
[docs] class Reader(object):
[docs] def process_setup(self):
super(NamedFile.Reader, self).process_setup()
self.filename = self.datatype.filename
@property
def absolute_path(self):
return self.get_absolute_path(self.filename)
[docs] class Writer(object):
def __init__(self, *args, **kwargs):
super(NamedFile.Writer, self).__init__(*args, **kwargs)
self.filename = self.datatype.filename
[docs] def write_file(self, data, text=False):
super(NamedFile.Writer, self).write_file(self.filename, data, text=text)
@property
def absolute_path(self):
return self.get_absolute_path(self.filename)
# Alias FilesInput as FileInput: the default min no. files is 1, so this makes sense, but is easier to read
# if only one file is expected
FileInput = FilesInput
[docs]class TextFile(NamedFile):
"""
Simple dataset containing just a single utf-8 encoded text file.
"""
datatype_name = "text_document"
datatype_options = OrderedDict([
("filename", {
"help": "The file's name. Typically left as the default. Default: data.txt",
"default": "data.txt",
})
] + list(NamedFileCollection.datatype_options.items()))
datatype_supports_python2 = True
[docs] class Reader(object):
[docs] def read_file(self, filename=None, mode="r", text=False):
# Ignore filename, since there's only one
return super(TextFile.Reader, self).read_file(text=text)
[docs] class Writer(object):
[docs] def write_file(self, data, text=False):
super(TextFile.Writer, self).write_file(data, text=text)