Module gatenlp.processing.executor

Expand source code
from gatenlp.processing.pipeline import _has_method
from gatenlp.utils import init_logger

__pdoc__ = {"Annotator.__call__": True}


class SerialCorpusExecutor:
    """
    Runs a pipeline on either a corpus, where each document gets in the corpus gets processed and stored back
    in turn, or on a source and destination, where each document from the source gets processed and all documents
    the are the result of processing get appended to the destination.
    """

    def __init__(
        self,
        annotator,
        corpus=None,
        source=None,
        destination=None,
        readonly=False,
        exit_on_error=False,
        logger=None,
    ):
        """
        Creates an Executor to run an annotator on either a corpus or a document source. If a corpus is specified,
        and no destination is specified,
        the document passed on to the annotator must be returned by the annotator and gets stored back into the
        corpus, unless readonly is True.

        If a corpus is specified and a destination is specified, the corpus is iterated over in sequence and
        documents are processed by the annotator and all documents returned by the annotator are appended to the
        destination.

        If a document source is processed, the document gets processed and the annotator can return zero,
        one or several documents which are appended to the destination unless readonly is set to True.

        An exception is thrown if both a courpus and a source are specified.

        Args:
            annotator: the callable to run on each document. If this is an instance of Annotator, the additional
              methods start, finish, and reduce are called as appropriate
            corpus: the corpus to process.
            source: a document source to process. Corpus and source are mutually exclusive.
            destination: if specified, the result documents are appended to the destination unless
              readonly is True.
            readonly: if True, nothing is saved back to the corpus or appended to the destination.
            exit_on_error: if True pass on exception, otherwise just log, use None and continue
            logger: logger to use, if None, uses a default logger

        Returns:
            if annotator has a finish() method calls it and returns whatever it returns, otherwise None
        """
        if (corpus is None and source is None) or (
            corpus is not None and source is not None
        ):
            raise Exception("Exactly one of corpus or source must be specified")
        self.corpus = corpus
        self.source = source
        self.destination = destination
        self.annotator = annotator
        self.readonly = readonly
        self.exit_on_error = exit_on_error
        self.n_in = 0
        self.n_none = 0  # number of None items from the corpus/source, ignored
        self.n_out = 0
        self.n_err = 0
        self.n_ok = 0
        if logger:
            self.logger = logger
        else:
            self.logger = init_logger(__name__)

    def __call__(self, **kwargs):
        if _has_method(self.annotator, "start"):
            self.annotator.start()
        if self.corpus is not None:
            for idx, doc in enumerate(self.corpus):
                self.n_in += 1
                if doc is None:
                    self.n_none += 1
                    continue
                try:
                    ret = self.annotator(doc, **kwargs)
                    self.n_ok += 1
                except Exception as ex:
                    self.n_err += 1
                    if self.exit_on_error:
                        raise ex
                    else:
                        docname = doc.name
                        self.logger.error(f"Error processing document {idx}/{docname}",
                                          exc_info=ex, stack_info=True)
                        continue
                if ret is None:
                    self.n_none += 1
                if self.destination is None:
                    if ret is None:
                        self.n_out += 1
                        continue
                    if isinstance(ret, list):
                        if len(ret) != 1:
                            raise Exception(
                                "Cannot update corpus if Annotator returns not exactly one document"
                            )
                        else:
                            ret = ret[0]
                    self.corpus[idx] = ret
                else:
                    if ret is not None:
                        if isinstance(ret, list):
                            for d in ret:
                                self.destination.append(d)
                                self.n_out += 1
                        else:
                            self.destination.append(ret)
                            self.n_out += 1
        else:
            idx = -1
            for doc in self.source:
                idx += 1
                self.n_in += 1
                if doc is None:
                    self.n_none += 1
                    continue
                try:
                    ret = self.annotator(doc, **kwargs)
                    self.n_ok += 1
                except Exception as ex:
                    self.n_err += 1
                    if self.exit_on_error:
                        raise ex
                    else:
                        docname = doc.name
                        self.logger.error(f"Error processing document {idx}/{docname}",
                                          exc_info=ex, stack_info=True)
                        continue
                if ret is None:
                    self.n_none += 1
                if self.destination is not None:
                    if isinstance(ret, list):
                        for d in ret:
                            self.destination.append(d)
                            self.n_out += 1
                    else:
                        self.destination.append(ret)
                        self.n_out += 1
        if _has_method(self.annotator, "finish"):
            rets = self.annotator.finish()
            return rets
        else:
            return None
        # NOTE: since this is single-threaded, no reduce call is necessary!

Classes

class SerialCorpusExecutor (annotator, corpus=None, source=None, destination=None, readonly=False, exit_on_error=False, logger=None)

Runs a pipeline on either a corpus, where each document gets in the corpus gets processed and stored back in turn, or on a source and destination, where each document from the source gets processed and all documents the are the result of processing get appended to the destination.

Creates an Executor to run an annotator on either a corpus or a document source. If a corpus is specified, and no destination is specified, the document passed on to the annotator must be returned by the annotator and gets stored back into the corpus, unless readonly is True.

If a corpus is specified and a destination is specified, the corpus is iterated over in sequence and documents are processed by the annotator and all documents returned by the annotator are appended to the destination.

If a document source is processed, the document gets processed and the annotator can return zero, one or several documents which are appended to the destination unless readonly is set to True.

An exception is thrown if both a courpus and a source are specified.

Args

annotator
the callable to run on each document. If this is an instance of Annotator, the additional methods start, finish, and reduce are called as appropriate
corpus
the corpus to process.
source
a document source to process. Corpus and source are mutually exclusive.
destination
if specified, the result documents are appended to the destination unless readonly is True.
readonly
if True, nothing is saved back to the corpus or appended to the destination.
exit_on_error
if True pass on exception, otherwise just log, use None and continue
logger
logger to use, if None, uses a default logger

Returns

if annotator has a finish() method calls it and returns whatever it returns, otherwise None

Expand source code
class SerialCorpusExecutor:
    """
    Runs a pipeline on either a corpus, where each document gets in the corpus gets processed and stored back
    in turn, or on a source and destination, where each document from the source gets processed and all documents
    the are the result of processing get appended to the destination.
    """

    def __init__(
        self,
        annotator,
        corpus=None,
        source=None,
        destination=None,
        readonly=False,
        exit_on_error=False,
        logger=None,
    ):
        """
        Creates an Executor to run an annotator on either a corpus or a document source. If a corpus is specified,
        and no destination is specified,
        the document passed on to the annotator must be returned by the annotator and gets stored back into the
        corpus, unless readonly is True.

        If a corpus is specified and a destination is specified, the corpus is iterated over in sequence and
        documents are processed by the annotator and all documents returned by the annotator are appended to the
        destination.

        If a document source is processed, the document gets processed and the annotator can return zero,
        one or several documents which are appended to the destination unless readonly is set to True.

        An exception is thrown if both a courpus and a source are specified.

        Args:
            annotator: the callable to run on each document. If this is an instance of Annotator, the additional
              methods start, finish, and reduce are called as appropriate
            corpus: the corpus to process.
            source: a document source to process. Corpus and source are mutually exclusive.
            destination: if specified, the result documents are appended to the destination unless
              readonly is True.
            readonly: if True, nothing is saved back to the corpus or appended to the destination.
            exit_on_error: if True pass on exception, otherwise just log, use None and continue
            logger: logger to use, if None, uses a default logger

        Returns:
            if annotator has a finish() method calls it and returns whatever it returns, otherwise None
        """
        if (corpus is None and source is None) or (
            corpus is not None and source is not None
        ):
            raise Exception("Exactly one of corpus or source must be specified")
        self.corpus = corpus
        self.source = source
        self.destination = destination
        self.annotator = annotator
        self.readonly = readonly
        self.exit_on_error = exit_on_error
        self.n_in = 0
        self.n_none = 0  # number of None items from the corpus/source, ignored
        self.n_out = 0
        self.n_err = 0
        self.n_ok = 0
        if logger:
            self.logger = logger
        else:
            self.logger = init_logger(__name__)

    def __call__(self, **kwargs):
        if _has_method(self.annotator, "start"):
            self.annotator.start()
        if self.corpus is not None:
            for idx, doc in enumerate(self.corpus):
                self.n_in += 1
                if doc is None:
                    self.n_none += 1
                    continue
                try:
                    ret = self.annotator(doc, **kwargs)
                    self.n_ok += 1
                except Exception as ex:
                    self.n_err += 1
                    if self.exit_on_error:
                        raise ex
                    else:
                        docname = doc.name
                        self.logger.error(f"Error processing document {idx}/{docname}",
                                          exc_info=ex, stack_info=True)
                        continue
                if ret is None:
                    self.n_none += 1
                if self.destination is None:
                    if ret is None:
                        self.n_out += 1
                        continue
                    if isinstance(ret, list):
                        if len(ret) != 1:
                            raise Exception(
                                "Cannot update corpus if Annotator returns not exactly one document"
                            )
                        else:
                            ret = ret[0]
                    self.corpus[idx] = ret
                else:
                    if ret is not None:
                        if isinstance(ret, list):
                            for d in ret:
                                self.destination.append(d)
                                self.n_out += 1
                        else:
                            self.destination.append(ret)
                            self.n_out += 1
        else:
            idx = -1
            for doc in self.source:
                idx += 1
                self.n_in += 1
                if doc is None:
                    self.n_none += 1
                    continue
                try:
                    ret = self.annotator(doc, **kwargs)
                    self.n_ok += 1
                except Exception as ex:
                    self.n_err += 1
                    if self.exit_on_error:
                        raise ex
                    else:
                        docname = doc.name
                        self.logger.error(f"Error processing document {idx}/{docname}",
                                          exc_info=ex, stack_info=True)
                        continue
                if ret is None:
                    self.n_none += 1
                if self.destination is not None:
                    if isinstance(ret, list):
                        for d in ret:
                            self.destination.append(d)
                            self.n_out += 1
                    else:
                        self.destination.append(ret)
                        self.n_out += 1
        if _has_method(self.annotator, "finish"):
            rets = self.annotator.finish()
            return rets
        else:
            return None