Source code for sortinghat.core.jobs

# -*- coding: utf-8 -*-
#
# Copyright (C) 2014-2021 Bitergia
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
#     Santiago Dueñas <sduenas@bitergia.com>
#     Miguel Ángel Fernández <mafesan@bitergia.com>
#

import itertools
import logging

import django_rq
import django_rq.utils
import pandas
import rq

from .api import enroll, merge, update_profile
from .context import SortingHatContext
from .errors import BaseError, NotFoundError, EqualIndividualError
from .log import TransactionsLog
from .models import Individual
from .recommendations.engine import RecommendationEngine


MAX_CHUNK_SIZE = 2000


logger = logging.getLogger(__name__)


[docs]def find_job(job_id): """Find a job in the jobs registry. Search for a job using its identifier. When the job is not found, a `NotFoundError` exception is raised. :param job_id: job identifier :returns: a Job instance :raises NotFoundError: when the job identified by `job_id` is not found. """ logger.debug(f"Finding job {job_id} ...") queue = django_rq.get_queue() jobs = django_rq.utils.get_jobs(queue, [job_id]) if not jobs: logger.debug(f"Job with id {job_id} does not exist") raise NotFoundError(entity=job_id) logger.debug(f"Job with id {job_id} was found") return jobs[0]
[docs]def get_jobs(): """Get a list of all jobs This function returns a list of all jobs found in the main queue and its registries, sorted by date. :returns: a list of Job instances """ logger.debug("Retrieving list of jobs ...") queue = django_rq.get_queue() started_jobs = [find_job(id) for id in queue.started_job_registry.get_job_ids()] deferred_jobs = [find_job(id) for id in queue.deferred_job_registry.get_job_ids()] finished_jobs = [find_job(id) for id in queue.finished_job_registry.get_job_ids()] failed_jobs = [find_job(id) for id in queue.failed_job_registry.get_job_ids()] scheduled_jobs = [find_job(id) for id in queue.scheduled_job_registry.get_job_ids()] jobs = (queue.jobs + started_jobs + deferred_jobs + finished_jobs + failed_jobs + scheduled_jobs) sorted_jobs = sorted(jobs, key=lambda x: x.enqueued_at) logger.debug(f"List of jobs retrieved; total jobs: {len(sorted_jobs)};") return sorted_jobs
[docs]@django_rq.job def recommend_affiliations(ctx, uuids=None): """Generate a list of affiliation recommendations from a set of individuals. This function generates a list of recommendations which include the organizations where individuals can be affiliated. This job returns a dictionary with which individuals are recommended to be affiliated to which organization. Individuals are defined by any of their valid keys or UUIDs. When the parameter `uuids` is empty, the job will take all the individuals stored in the registry. :param ctx: context where this job is run :param uuids: list of individuals identifiers :returns: a dictionary with which individuals are recommended to be affiliated to which organization. """ job = rq.get_current_job() if not uuids: logger.info(f"Running job {job.id} 'recommend affiliations'; uuids='all'; ...") uuids = Individual.objects.values_list('mk', flat=True).iterator() else: logger.info(f"Running job {job.id} 'recommend affiliations'; uuids={uuids}; ...") uuids = iter(uuids) results = {} job_result = { 'results': results } engine = RecommendationEngine() # Create a new context to include the reference # to the job id that will perform the transaction. job_ctx = SortingHatContext(ctx.user, job.id) # Create an empty transaction to log which job # will generate the enroll transactions. trxl = TransactionsLog.open('recommend_affiliations', job_ctx) for chunk in _iter_split(uuids, size=MAX_CHUNK_SIZE): for rec in engine.recommend('affiliation', chunk): results[rec.key] = rec.options trxl.close() logger.info( f"Job {job.id} 'recommend affiliations' completed; " f"{len(results)} recommendations generated" ) return job_result
[docs]@django_rq.job def recommend_matches(ctx, source_uuids, target_uuids, criteria, verbose=False): """Generate a list of affiliation recommendations from a set of individuals. This function generates a list of recommendations which include the matching identities from the individuals which can be merged with. This job returns a dictionary with which individuals are recommended to be merged to which individual (or which identities is `verbose` mode is activated). Individuals both for `source_uuids` and `target_uuids` are defined by any of their valid keys or UUIDs. When the parameter `target_uuids` is empty, the recommendation engine will take all the individuals stored in the registry, so matches will be found comparing the identities from the individuals in `source_uuids` against all the identities on the registry. :param ctx: context where this job is run :param source_uuids: list of individuals identifiers to look matches for :param target_uuids: list of individuals identifiers where to look for matches :param criteria: list of fields which the match will be based on (`email`, `name` and/or `username`) :param verbose: if set to `True`, the match results will be composed by individual identities (even belonging to the same individual). :returns: a dictionary with which individuals are recommended to be merged to which individual or which identities. """ check_criteria(criteria) job = rq.get_current_job() logger.info(f"Running job {job.id} 'recommend matches'; criteria='{criteria}'; ...") results = {} job_result = { 'results': results } engine = RecommendationEngine() # Create a new context to include the reference # to the job id that will perform the transaction. job_ctx = SortingHatContext(ctx.user, job.id) trxl = TransactionsLog.open('recommend_matches', job_ctx) for rec in engine.recommend('matches', source_uuids, target_uuids, criteria, verbose): results[rec.key] = list(rec.options) trxl.close() logger.info( f"Job {job.id} 'recommend matches' completed; " f"{len(results)} recommendations generated" ) return job_result
[docs]@django_rq.job def recommend_gender(ctx, uuids): """Generate a list of gender recommendations from a set of individuals. This job generates a list of recommendations with the probable gender of the given individuals. :param ctx: context where this job is run :param uuids: list of individuals identifiers :returns: a dictionary with the recommended gender and accuracy of the prediction for each individual. """ job = rq.get_current_job() logger.info(f"Running job {job.id} 'recommend gender'; ...") results = {} job_result = { 'results': results } engine = RecommendationEngine() job_ctx = SortingHatContext(ctx.user, job.id) trxl = TransactionsLog.open('recommend_gender', job_ctx) for rec in engine.recommend('gender', uuids): results[rec.key] = {'gender': rec.options[0], 'accuracy': rec.options[1]} trxl.close() logger.info( f"Job {job.id} 'recommend gender' completed; " f"{len(results)} recommendations generated" ) return job_result
[docs]@django_rq.job def affiliate(ctx, uuids=None): """Affiliate a set of individuals using recommendations. This function automates the affiliation process obtaining a list of recommendations where individuals can be affiliated. After that, individuals are enrolled to them. This job returns a dictionary with which individuals were enrolled and the errors generated during this process. Individuals are defined by any of their valid keys or UUIDs. When the parameter `uuids` is empty, the job will take all the individuals stored in the registry. :param ctx: context where this job is run :param uuids: list of individuals identifiers :returns: a dictionary with which individuals were enrolled and the errors found running the job """ job = rq.get_current_job() if not uuids: logger.info(f"Running job {job.id} 'affiliate'; uuids='all'; ...") uuids = Individual.objects.values_list('mk', flat=True).iterator() else: logger.info(f"Running job {job.id} 'affiliate'; uuids={uuids}; ...") uuids = iter(uuids) results = {} errors = [] job_result = { 'results': results, 'errors': errors } engine = RecommendationEngine() # Create a new context to include the reference # to the job id that will perform the transaction. job_ctx = SortingHatContext(ctx.user, job.id) # Create an empty transaction to log which job # will generate the enroll transactions. trxl = TransactionsLog.open('affiliate', job_ctx) nsuccess = 0 for chunk in _iter_split(uuids, size=MAX_CHUNK_SIZE): for rec in engine.recommend('affiliation', chunk): affiliated, errs = _affiliate_individual(job_ctx, rec.key, rec.options) results[rec.key] = affiliated errors.extend(errs) if affiliated: nsuccess += 1 trxl.close() logger.info( f"Job {job.id} 'affiliate' completed; " f"{nsuccess} individuals have new affiliations" ) return job_result
[docs]@django_rq.job def unify(ctx, source_uuids, target_uuids, criteria): """Unify a set of individuals by merging them using matching recommendations. This function automates the identities unify process obtaining a list of recommendations where matching individuals can be merged. After that, matching individuals are merged. This job returns a list with the individuals which have been merged and the errors generated during this process. Individuals both for `source_uuids` and `target_uuids` are defined by any of their valid keys or UUIDs. When the parameter `target_uuids` is empty, the matches and the later merges will take place comparing the identities from the individuals in `source_uuids` against all the identities on the registry. :param ctx: context where this job is run :param source_uuids: list of individuals identifiers to look matches for :param target_uuids: list of individuals identifiers where to look for matches :param criteria: list of fields which the unify will be based on (`email`, `name` and/or `username`) :returns: a list with the individuals resulting from merge operations and the errors found running the job """ def _group_recommendations(recs): """Calculate unique sets of identities from matching recommendations. For instance, given a list of matching groups like A = {A, B}; B = {B,A,C}, C = {C,} and D = {D,} the output for keys A, B and C will be the group {A, B, C}. As D has no matches, it won't be included in any group and it won't be returned. :param recs: recommendations of matching identities :returns: a list including unique groups of matches """ groups = [] for group_key in recs: g_uuids = pandas.Series(recs[group_key]) g_uuids = g_uuids.append(pandas.Series([group_key])) g_uuids = list(g_uuids.sort_values().unique()) if (len(g_uuids) > 1) and (g_uuids not in groups): groups.append(g_uuids) return groups check_criteria(criteria) job = rq.get_current_job() logger.info(f"Running job {job.id} 'unify'; criteria='{criteria}'; ...") results = [] errors = [] job_result = { 'results': results, 'errors': errors } engine = RecommendationEngine() # Create a new context to include the reference # to the job id that will perform the transaction. job_ctx = SortingHatContext(ctx.user, job.id) trxl = TransactionsLog.open('unify', job_ctx) match_recs = {} for rec in engine.recommend('matches', source_uuids, target_uuids, criteria): match_recs[rec.key] = list(rec.options) match_groups = _group_recommendations(match_recs) # Apply the merge of the matching identities for group in match_groups: uuid = group[0] result = group[1:] merged_to, errs = _merge_individuals(job_ctx, uuid, result) if merged_to: results.append(merged_to) errors.extend(errs) trxl.close() logger.info( f"Job {job.id} 'unify' completed; " f"{len(results)} individuals have been merged" ) return job_result
[docs]@django_rq.job def genderize(ctx, uuids=None): """Assign a gender to a set of individuals using recommendations. This job autocompletes the gender information (stored in the profile) of unique identities after obtaining a list of recommendations for their gender based on their name. Individuals are defined by any of their valid keys or UUIDs. When the parameter `uuids` is empty, the job will take all the individuals stored in the registry. :param ctx: context where this job is run :param uuids: list of individuals identifiers :returns: a dictionary with which individual profiles were updated and the errors found running the job """ job = rq.get_current_job() if not uuids: logger.info(f"Running job {job.id} 'genderize'; uuids='all'; ...") uuids = Individual.objects.values_list('mk', flat=True).iterator() else: logger.info(f"Running job {job.id} 'genderize'; uuids={list(uuids)}; ...") uuids = iter(uuids) results = {} errors = [] job_result = { 'results': results, 'errors': errors } engine = RecommendationEngine() # Create a new context to include the reference # to the job id that will perform the transaction. job_ctx = SortingHatContext(ctx.user, job.id) # Create an empty transaction to log which job # will generate the enroll transactions. trxl = TransactionsLog.open('autogender', job_ctx) nsuccess = 0 for chunk in _iter_split(uuids, size=MAX_CHUNK_SIZE): for rec in engine.recommend('gender', chunk): gender, acc = rec.options updated, errs = _update_individual_gender(job_ctx, rec.key, rec.options) results[rec.key] = updated errors.extend(errs) if updated: nsuccess += 1 trxl.close() logger.info( f"Job {job.id} 'genderize' completed; " f"{nsuccess} individuals have been updated" ) return job_result
def _merge_individuals(job_ctx, source_indv, target_indvs): """Merge a set of individuals. Returns a tuple with two elements: list of the uuids from the individuals who were merged; list of errors found during the process. :param job_ctx: job context :param source_indv: valid individual identifier where the rest of individuals will be merged to :param target_indvs: list of identifiers of the individuals who will be merged with the source individual :returns: tuple with the uuid from the individual resulting from the merge operation (if any), and list of errors found during the process """ logger.debug( f"Merging individuals; " f"job={job_ctx.job_id} source={source_indv} target={target_indvs}; ..." ) errors = [] try: to_indv = merge(job_ctx, target_indvs, source_indv) except EqualIndividualError: # When source identity is already part of the destination, the merge is not applied to_indv = None pass except BaseError as exc: to_indv = None errors.append(str(exc)) to_indv = to_indv.mk if to_indv else None logger.debug( f"Individuals merging completed with {len(errors)} errors;" f"job={job_ctx.job_id} source={source_indv} target={target_indvs}" ) return to_indv, errors def _affiliate_individual(job_ctx, uuid, organizations): """Affiliate an individual to a list of organizations. Returns a tuple with two elements: list of the organizations the individual was enrolled to; list of the errors found during the process. :param job_ctx: job context :param uuid: valid individual identifier :param organizations: list of organization names :returns: tuple with the organizations affiliated to the i """ logger.debug( f"Affiliating individual; " f"job={job_ctx.job_id} uuid={uuid} organizations={organizations}; ..." ) affiliated = [] errors = [] for name in organizations: try: enroll(job_ctx, uuid, name) except BaseError as exc: errors.append(str(exc)) else: affiliated.append(name) logger.debug( f"Individual affiliation completed with {len(errors)} errors; " f"job={job_ctx.job_id} uuid={uuid} organizations={organizations}; ..." ) return affiliated, errors def _update_individual_gender(job_ctx, uuid, recommendation): errors = [] gender, gender_acc = recommendation logger.debug( f"Updating individual profile; " f"job={job_ctx.job_id} uuid={uuid} gender={gender} gender_acc={gender_acc}; ..." ) try: update_profile(job_ctx, uuid, gender=gender, gender_acc=gender_acc) except BaseError as exc: errors.append(str(exc)) logger.debug( f"Profile updated with {len(errors)} errors; " f"job={job_ctx.job_id} uuid={uuid} gender={gender} gender_acc={gender_acc}; ..." ) return recommendation, errors def _iter_split(iterator, size=None): """Split an iterator in chunks of the same size. When size is `None` the iterator will only return one chunk. :param iterator: iterator to split :param size: size of the chunk; :returns: generator of chunks """ # This code is based on Ashley Waite's answer to StackOverflow question # "split a generator/iterable every n items in python (splitEvery)" # (https://stackoverflow.com/a/44320132). try: while True: slice_iter = itertools.islice(iterator, size) peek = next(slice_iter) yield itertools.chain([peek], slice_iter) except StopIteration: return
[docs]def check_criteria(criteria): """ Check if all given criteria are valid. Raises an error if a criterion is not in the valid criteria list (`email`, `name` and/or `username`). :param criteria: list of criteria to check """ valid_criteria = ['name', 'email', 'username'] if any(criterion not in valid_criteria for criterion in criteria): raise ValueError(f"Invalid criteria {criteria}. Valid values are: {valid_criteria}")