Skip to content
Commits on Source (20)
......@@ -51,7 +51,9 @@ dump*
resources/data/bdap_anagrafe_enti.csv
resources/data/parsers
resources/data/out
resources/data/atoka
resources/data/items.json
/resources/data/discrepancies.csv
resources/data/role_types_atoka_opdm.csv
/resources/data/cache
docker-compose.*.yml
......@@ -7,7 +7,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]
## [1.0.1]
### Added
- ETL tasks for Atoka integration
- maintenance script for organizations missing classifications
- maintenance script for duplicate organization merge
- data changelog and main changelog urls and views added and linked to
in about page
- verification of administrative history of an institution from minint
......
......@@ -3,4 +3,4 @@
Openpolis Data Manager service package (backend)
"""
__version__ = '1.0.0'
__version__ = '1.0.1'
......@@ -30,7 +30,7 @@ from haystack import signals
from project.connections.atoka import AtokaConn, AtokaObjectDoesNotExist, AtokaException
codicefiscale.__DATA["municipalities"].update(
codicefiscale._DATA["municipalities"].update(
{
"donada": {"code": "D337", "province": "RO", "name": "DONADA"},
"terzo-d-aquileia": {"code": "L144", "province": "UD", "name": "TERZO D'AQUILEIA"},
......@@ -49,7 +49,7 @@ codicefiscale.__DATA["municipalities"].update(
}
)
codicefiscale.__DATA["countries"].update(
codicefiscale._DATA["countries"].update(
{
"ex-repubblica-jugoslava-di-macedonia": {
"code": "Z148",
......@@ -90,6 +90,22 @@ def get_orgs_dict_by_org_id(classifications_labels):
return ret
def get_org_by_tax_id_dict(tax_ids: list) -> dict:
"""Build a mapping: tax_id => pk
:param tax_ids:
:return:
"""
orgs = Organization.objects.filter(
identifier__in=tax_ids,
).values("id", "identifier", "name", "classification")
ret = {}
for org in orgs:
ret[org["identifier"]] = {"id": org["id"], "classification": org["classification"], "name": org["name"]}
return ret
def get_roletypes_dict_by_role(classifications_labels):
"""
......@@ -359,6 +375,28 @@ def get_organs_dict_by_city_prov():
return ret
def batch_generator(batchsize, iterator):
"""Creates a generator containing batches of values, sized `batchsize`, out of iterator
:param batchsize: size of each batch
:param iterator: list of original values
:return: a generator yielding a list of values each time
"""
while True:
pk_buffer = []
try:
# Consume queryset iterator until batch is reached or the
# iterator has been exhausted.
while len(pk_buffer) < batchsize:
pk_buffer.append(next(iterator))
except StopIteration:
# Break out of the loop once the queryset has been consumed.
break
finally:
# yield the batch of pks
yield pk_buffer
class UnprocessableEntitytAPIException(APIException):
status_code = 422
default_detail = "The request was formally correct, but not accettable."
......@@ -1056,11 +1094,23 @@ class PersonUtils(object):
else:
for m in item["memberships"]:
# skip memberhips with no start_date
if 'start_date' not in m or m['start_date'] is None:
if logger:
logger.error("No start date for membership {0}".format(m))
continue
# get or create the Post this role refers to, from role and organization_id
# memberships must have been "annotated" with
# Electoral KeyEvent, Organization and RoleType instances during transform()
organization = Organization.objects.get(pk=m["organization_id"])
role_type = RoleType.objects.get(label__iexact=m["role"])
try:
role_type = RoleType.objects.get(label__iexact=m["role"])
except RoleType.DoesNotExist:
if logger:
logger.error("RoleType could be found for {0}".format(m["role"]))
continue
electoral_event_id = m.get("electoral_event_id", None)
if electoral_event_id:
electoral_event = KeyEvent.objects.get(id=electoral_event_id)
......@@ -1230,8 +1280,8 @@ class PersonUtils(object):
# not necessary and contains non-serializable keys
if "memberships" in loader_context["item"]:
for m in loader_context["item"]["memberships"]:
m.pop("organization")
m.pop("role_type")
m.pop("organization", None)
m.pop("role_type", None)
aka, created = AKA.objects.update_or_create(
search_params=search_params,
......@@ -1457,7 +1507,7 @@ class OrganizationUtils(object):
index.update_object(item)
@classmethod
def org_anagraphical_lookup(cls, item, logger):
def org_anagraphical_lookup(cls, item, logger, current=False):
"""anagraphical lookup strategy implementation
:param item: the item to lookup in the DB
......@@ -1467,7 +1517,10 @@ class OrganizationUtils(object):
org_id = 0
basic_filters = {"name__iexact": item["name"], "identifier": item["identifier"]}
if current:
basic_filters.update({"dissolution_date__isnull": True})
discrimination_filter = {"founding_date": item.get("founding_date", None)}
filters = basic_filters
try:
org = Organization.objects.get(**filters)
......@@ -1487,7 +1540,7 @@ class OrganizationUtils(object):
return org_id
@classmethod
def org_identifier_lookup(cls, item, identifier_scheme):
def org_identifier_lookup(cls, item, identifier_scheme, current=False):
"""identifier lookup strategy implementation
:param item: the item to lookup in the DB
......@@ -1507,6 +1560,10 @@ class OrganizationUtils(object):
filters = {"identifiers__scheme": scheme, "identifiers__identifier": identifier}
else:
filters = {"identifier": item["identifier"]}
if current:
filters.update({"dissolution_date__isnull": True})
try:
org = Organization.objects.get(**filters)
org_id = org.id
......@@ -1521,7 +1578,7 @@ class OrganizationUtils(object):
def org_lookup(cls, item, strategy, **kwargs):
"""
:param item:
:param strategy: lookup strategy (anagraphical, identifier)
:param strategy: lookup strategy (anagraphical, identifier, mixed, anagraphical_current, ...)
:param kwargs: other params
- identifier_scheme
- logger
......@@ -1533,17 +1590,23 @@ class OrganizationUtils(object):
identifier_scheme = kwargs.get("identifier_scheme", None)
logger = kwargs.get("logger", None)
try:
strategy, current = strategy.split("_")
current = True
except ValueError:
strategy, current = strategy, False
if strategy == "anagraphical":
org_id = cls.org_anagraphical_lookup(item, logger)
org_id = cls.org_anagraphical_lookup(item, logger, current=current)
elif strategy in ["identifier", "mixed"]:
# identifier lookup strategy: use given scheme or lookup on natural identifier (CF)
# if not found a try with the anagraphical lookup strategy is done
org_id = cls.org_identifier_lookup(item, identifier_scheme)
org_id = cls.org_identifier_lookup(item, identifier_scheme, current=current)
# mixed strategy: try the anagraphical lookup strategy if identifier lookup failed
if org_id == 0 and strategy == "mixed":
org_id = cls.org_anagraphical_lookup(item, logger)
org_id = cls.org_anagraphical_lookup(item, logger, current=current)
else:
raise Exception("accepted values for lookup_strategy are: anagraphical, identifier, mixed")
......@@ -1765,7 +1828,7 @@ class SearchUtils(object):
family_name = family_name.replace(" ", "\\ ")
given_name = given_name.replace(" ", "\\ ")
if birth_location:
birth_location = birth_location.replace(" ", "\\ ").replace("(", "\(").replace(")", "\)")
birth_location = birth_location.replace(" ", "\\ ").replace("(", "\\(").replace(")", "\\)")
# raw fuzzy search query building
raw_fuzzy_search_query = ""
......
import itertools
import json
from io import BytesIO
from typing import Type, Union
import pandas as pd
import requests
from opdmetl.extractors import CSVExtractor, RemoteExtractor, Extractor
from project.connections.atoka import AtokaConn
from project.connections.atoka import AtokaConn, AtokaObjectDoesNotExist
class ListExtractor(Extractor):
......@@ -47,17 +50,25 @@ class JsonArrayExtractor(RemoteExtractor):
super().__init__(remote_url)
def extract(self, **kwargs):
"""Extract content
"""Extract content from json source, be it a remote url or a local file
"""
self.logger.info("Fetching data from {0}".format(self.remote_url))
resp = requests.get(self.remote_url)
if resp.status_code == 200:
data = resp.json()
results = data
else:
results = []
try:
resp = requests.get(self.remote_url)
if resp.status_code == 200:
data = resp.json()
results = data
else:
results = []
except requests.exceptions.MissingSchema:
try:
with open(self.remote_url, "r") as json_file:
results = json.load(json_file)
except Exception as e:
self.logger.info("{0}. Data could not be fetched.".format(e))
results = []
return results
......@@ -184,8 +195,7 @@ class UACSVExtractor(CSVExtractor):
class AtokaOwnershipsExtractor(Extractor):
""":class:`Extractor` for extractions of ownerships information out of the Atoka API
It is just a tiny wrapper around the `AtokaConn.extract_ownerships_from_tax_ids` method,
so that this can be used inside the `opdmetl` framework.
Uses methods in the `AtokaConn` class, to extract information from atoka api.
"""
def __init__(self, batch: list):
......@@ -200,8 +210,280 @@ class AtokaOwnershipsExtractor(Extractor):
self.batch = batch
super().__init__()
def extract(self, **kwargs):
"""Extract content
def extract(self, **kwargs) -> dict:
"""Extract meaningful information from Atoka.
Given a list of tax_ids in self.batch, queries Atoka API, in order to retrieve oned shares and roles.
Ownerships are returned as a list.
Each element in the list represents an **owner**, and has major identifiers, classifications and
**owned organizations**.
Owned organizations are embedded in the `shares_owned` list.
Each element of `shares_owned` contains identifiers, classifications and
**people having roles** in the organization.
[
{
'atoka_id': owner_atoka_id,
'tax_id': owner_tax_id,
'other_atoka_ids': [ ... ],
'name': owner_name,
'legal_form': owner_atoka_legal_form_level2,
'rea': owner_rea_id,
'cciaa': owner_cciaa,
'shares_owned': [
{
'name': owned_name,
'founded': owned_founded,
'atoka_id': owned_atoka_id,
'tax_id': owned_tax_id,
'rea': owner_rea_id,
'cciaa': owner_cciaa,
'legal_form': owned_atoka_legal_form_level2,
'ateco': owned_ateco,
'percentage': ratio * 100,
'last_update': share_last_update,
'roles': [
{
'person': {
'family_name': familyName,
'given_name': givenName,
'gender': gender,
'birth_date': birthDate,
'birth_location': birthPlace,
'name': name,
'atoka_id':id,
'tax_id': taxId,
},
'label': role_name,
'start_date': role_since
},
...
]
},
...
]
},
...
]
:return: a list with ownerhip and embedded details, with roles
"""
self.logger.debug("Fetching data from atoka for this batch: {0}".format(",".join(self.batch)))
return AtokaConn().extract_ownerships_from_tax_ids(self.batch)
tax_ids = self.batch
atoka_conn = AtokaConn()
atoka_companies_requests = 0
atoka_people_requests = 0
# fetch all companies among the list having govType values set
# will need batch_size=1 here, because shares may contain many results
# and the limit is 50
try:
res_tot = atoka_conn.get_companies_from_tax_ids(
tax_ids, packages='base,shares', active='true', batch_size=1
)
atoka_companies_requests += len(res_tot)
self.logger.debug(
"- da {0} tax_ids, ricevuti da Atoka dettagli e quote per {1} istituzioni "
"(ci possono essere doppioni)".format(
len(tax_ids), len(res_tot)
)
)
except AtokaObjectDoesNotExist:
res_tot = []
# return multiple results for single tax_ids, if any
res_doubles = {}
for r in res_tot:
if r['base']['taxId'] not in res_doubles:
res_doubles[r['base']['taxId']] = []
res_doubles[r['base']['taxId']].append(r['id'])
res_doubles = {
k: v
for k, v in res_doubles.items() if len(v) > 1
}
# remove owners with no shares, or shares to non-active companies
def check_owner_has_shares_in_active_companies(owner_org):
if 'shares' in owner_org and 'sharesOwned' in owner_org['shares']:
properties_to_active = list(filter(
lambda x: x['active'] is True and x['typeOfRight'] == 'proprietà' and 'ratio' in x,
owner_org['shares']['sharesOwned']
))
if len(properties_to_active):
return True
return False
res_tot = list(filter(check_owner_has_shares_in_active_companies, res_tot))
self.logger.debug(
"- {0} istituzioni hanno partecipazioni in aziende attive".format(
len(res_tot)
)
)
# transform the results into a dict
res_dict = {
r['base']['taxId']: {
'atoka_id': r['id'],
'other_atoka_ids': [
atoka_id for atoka_id in res_doubles[r['base']['taxId']] if atoka_id != r['id']
] if r['base']['taxId'] in res_doubles else [],
'name': r['name'],
'legal_form': [x['name'] for x in r['base']['legalForms'] if x['level'] == 2][0],
'rea': r['base'].get('rea', None),
'cciaa': r['base'].get('cciaa', None),
'shares_owned': [
{
'name': sho['name'],
'last_updated': sho['lastUpdate'],
'atoka_id': sho['id'],
'percentage': sho['ratio'] * 100.
}
for sho in filter(
lambda x: x['active'] is True and x['typeOfRight'] == 'proprietà' and 'ratio' in x,
r['shares']['sharesOwned']
)
]
} for r in res_tot
}
# extract all atoka_ids from shares_owned elements and returns flat list
# then apply list(set(x)) to remove duplicates, if any
owned_atoka_ids = list(set(list(itertools.chain.from_iterable([
[x['atoka_id'] for x in r['shares_owned']]
for r in res_dict.values()
]))))
owned_orgs = atoka_conn.get_companies_from_atoka_ids(
owned_atoka_ids, packages='base', active='true', batch_size=1
)
atoka_companies_requests += len(owned_orgs)
self.logger.debug("- ricevuti dettagli per {0} partecipate".format(len(owned_orgs)))
owned_orgs_dict = {
r['id']: {
'name': r['name'],
'cciaa': r['base'].get('cciaa', None),
'rea': r['base'].get('rea', None),
'tax_id': r['base'].get('taxId', None),
'vat': r['base'].get('vat', None),
'founded': r['base'].get('founded', None),
'legal_form': [x['name'] for x in r['base']['legalForms'] if x['level'] == 2][0],
} for r in owned_orgs
}
# extract all people's atoka_ids from res_owned elements and returns flat list, removing duplicates
people = atoka_conn.get_roles_from_atoka_ids(
owned_atoka_ids, packages='base,companies',
companiesRolesOfficial='true', companiesRoles=atoka_conn.allowed_roles
)
atoka_people_requests += len(people)
def extract_birth_place(base: dict) -> Type[Union[str, None]]:
"""
:param base:
:return:
"""
if 'birthPlace' not in base:
return None
if 'municipality' in base['birthPlace']:
return "{0} ({1})".format(
base['birthPlace']['municipality'],
base['birthPlace']['provinceCode']
)
else:
return "{0} ({1})".format(base['birthPlace']['state'], base['birthPlace']['stateCode'])
people_ids = []
people_dict = {}
for person in people:
if (
'base' in person and
'gender' in person['base'] and
'familyName' in person['base'] and
'givenName' in person['base'] and
person['id'] not in people_ids
):
people_ids.append(person['id'])
people_dict[person['id']] = {
'given_name': person['base']['givenName'],
'family_name': person['base']['familyName'],
'gender': person['base']['gender'],
'birth_date': person['base'].get('birthDate', None),
'birth_location': extract_birth_place(person['base']),
'tax_id': person['base'].get('taxId', None),
'companies': [
{
'id': x['id'],
'roles': [
xr for xr in x['roles']
if 'official' in xr and 'name' in xr
]
}
for x in person['companies']['items'] if x['id'] in owned_atoka_ids
]
}
self.logger.debug(
"- ricevuti dettagli per {0} persone ({1} distinte)".format(
len(people), len(people_ids)
)
)
# upgrade owned_orgs_dict with roles, from people_dict
for atoka_id, person in people_dict.items():
person['atoka_id'] = atoka_id
for company in person.pop('companies', []):
org = owned_orgs_dict.get(company['id'])
if org:
if 'roles' not in org:
org['roles'] = []
org['roles'].extend([
{
'person': person,
'label': org_role['name'],
'start_date': org_role.get('since', None)
}
for org_role in company['roles']
])
else:
self.logger.warning(
"! azienda {0} richiesta ad atoka nel calcolo dei ruoli per {1}, "
"ma non presente nei risultati".format(company['id'], person['atoka_id'])
)
# upgrade res_dict values with details values
for tax_id, owner in res_dict.items():
for owned in owner['shares_owned']:
owned_details = owned_orgs_dict.get(owned['atoka_id'], None)
if owned_details:
for f in ['name', 'cciaa', 'rea', 'tax_id', 'vat', 'founded', 'legal_form', 'roles']:
if f in owned_details:
owned[f] = owned_details[f]
else:
self.logger.warning(
"! organizzazione {0} richiesta ad atoka, "
"ma non presente nei risultati".format(owned['atoka_id'])
)
# returns a list
results = []
for tax_id, result in res_dict.items():
result['tax_id'] = tax_id
results.append(result)
return {
'meta': {
'atoka_requests': {
'people': atoka_people_requests,
'companies': atoka_companies_requests
},
'ids': {
'people': people_ids,
'companies': owned_atoka_ids
}
},
'results': results
}
......@@ -119,8 +119,8 @@ class PopoloOrgLoader(PopoloLoader):
self.update_strategy = kwargs.get('update_strategy', 'keep_old')
def load_item(self, item, **kwargs):
"""load Person into the Popolo models
lookup a person, with strategy defined in self.lookup_strategy
"""load Organization into the Popolo models
lookup an organization, with strategy defined in self.lookup_strategy
invoke update_or_create_from_item (anagraphical data plus identifiers, contacts, ...)
:param item: the item to be loaded
......
......@@ -501,7 +501,7 @@ class PopoloPersonWithMembershipsLoader(PopoloPersonLoader):
loader_context = {
'module': __name__,
'class': self.__class__.__name__,
'data_source': self.etl.extractor.remote_url,
'data_source': getattr(self.etl.extractor, "remote_url", self.etl.source),
'context': self.context,
'check_membership_label': self.check_membership_label,
'item': item
......
# -*- coding: utf-8 -*-
from django.db.models import Count
from opdmetl import ETL
import json
from popolo.models import Organization
from taskmanager.utils import LoggingBaseCommand
from project.api_v1.etl.extractors import AtokaOwnershipsExtractor, ListExtractor
from project.api_v1.etl.loaders.organizations import PopoloOrgOwnershipLoader, PopoloOrgLoader
from project.api_v1.etl.transformations.atoka import AtokaOwnershipTransformation, AtokaOwnershipOrgTransformation
from project.api_v1.core import batch_generator
from project.api_v1.etl.extractors import AtokaOwnershipsExtractor
class Command(LoggingBaseCommand):
help = "Import ownerships among organizations from ATOKA's API"
help = "Extract all needed info from ATOKA's API and store them as an array into a local JSON file"
def add_arguments(self, parser):
parser.add_argument(
"--batch-size",
dest="batchsize", type=int,
default=10,
default=25,
help="Size of the batch of organizations processed at once",
)
@staticmethod
def batch_generator(batchsize, iterator):
"""Creates a generator containing batches of values, sized `batchsize`, out of iterator
:param batchsize: size of each batch
:param iterator: list of original values
:return: a generator yielding a list of values each time
"""
while True:
pk_buffer = []
try:
# Consume queryset iterator until batch is reached or the
# iterator has been exhausted.
while len(pk_buffer) < batchsize:
pk_buffer.append(next(iterator))
except StopIteration:
# Break out of the loop once the queryset has been consumed.
break
finally:
# yield the batch of pks
yield pk_buffer
parser.add_argument(
"--offset",
dest="offset", type=int,
default=0,
help="Start processing",
)
parser.add_argument(
"--classifications",
nargs='*', metavar='CLASS',
dest="classifications", type=int,
help="Only process specified classifications "
"(by id, ex: Ministero, Consiglio Reg., Città Metrop.: 498,133,279)",
)
parser.add_argument(
"--json-file",
dest="jsonfile",
default="./resources/out/atoka.json",
help="Complete path to json file"
)
def handle(self, *args, **options):
self.setup_logger(__name__, formatter_key='simple', **options)
batchsize = options['batchsize']
offset = options['offset']
jsonfile = options['jsonfile']
classifications = options.get('classifications', [])
self.logger.info("Start procedure")
......@@ -60,66 +59,86 @@ class Command(LoggingBaseCommand):
]
).filter(identifier__isnull=False)
if classifications:
organizations_qs = organizations_qs.filter(classifications__classification_id__in=classifications)
# group organizations by classification counting occurrences
organizations_groups = [
{
'descr': x['classifications__classification__descr'],
'id': x['classifications__classification_id'],
'n': x['n']
'n': 0
}
for x in list(organizations_qs.values(
'classifications__classification__descr',
'classifications__classification_id'
).distinct().annotate(
n=Count('classifications__classification_id')
).order_by('-n'))
).distinct())
]
# # test single batch of 10 on first org group (comuni)
# organizations_group = organizations_groups[0]
# self.logger.info('processing {0} organizations classified as {1}'.format(
# organizations_group['n'], organizations_group['descr']
# ))
# batches = self.batch_generator(
# batchsize, organizations_qs.filter(
# classifications__classification_id=organizations_group['id']
# ).values_list('identifier', flat=True).distinct().iterator()
# )
for group in organizations_groups:
group['n'] = organizations_qs.filter(classifications__classification_id=group['id']).count()
organizations_groups = sorted(organizations_groups, key=lambda x: x['n'] * -1)
atoka_records = []
atoka_companies_requests = 0
atoka_people_requests = 0
people_ids = []
owned_ids = []
counter = 0
for organizations_group in organizations_groups:
self.logger.info('processing {0} organizations classified as {1}'.format(
organizations_group['n'], organizations_group['descr']
))
# generate batches of batchsize, to query atoka's endpoint
batches = self.batch_generator(
batches = batch_generator(
batchsize, organizations_qs.filter(
classifications__classification_id=organizations_group['id']
).values_list('identifier', flat=True).distinct().iterator()
)
counter = 0
group_counter = 0
for tax_ids_batch in batches:
# extract atoka_ownerships into a list in memory,
# in order to use it in the two following ETL procedures
atoka_ownerships = AtokaOwnershipsExtractor(tax_ids_batch).extract()
# update or create organizations taken from atoka_ownerships
self.logger.info("organizations")
ETL(
extractor=ListExtractor(atoka_ownerships),
transformation=AtokaOwnershipOrgTransformation(),
loader=PopoloOrgLoader(lookup_strategy='identifier', identifier_scheme=None),
log_level=self.logger.level,
)()
# update or create ownerships taken from atoka_ownerships
self.logger.info("ownerships")
ETL(
extractor=ListExtractor(atoka_ownerships),
transformation=AtokaOwnershipTransformation(),
loader=PopoloOrgOwnershipLoader(lookup_strategy='identifier', identifier_scheme=None),
log_level=self.logger.level,
)()
counter = counter + len(tax_ids_batch)
self.logger.info("{0} organizations processed out of {1}".format(counter, organizations_group['n']))
# implement offset
if counter >= offset:
atoka_extractor = AtokaOwnershipsExtractor(tax_ids_batch)
atoka_extractor.logger = self.logger
atoka_res = atoka_extractor.extract()
atoka_records.extend(atoka_res['results'])
atoka_companies_requests += atoka_res['meta']['atoka_requests']['companies']
atoka_people_requests += atoka_res['meta']['atoka_requests']['people']
people_ids.extend(atoka_res['meta']['ids']['people'])
people_ids = list(set(people_ids))
owned_ids.extend(atoka_res['meta']['ids']['companies'])
owned_ids = list(set(owned_ids))
group_counter += len(tax_ids_batch)
self.logger.info("[{0}]: {1} tax_ids, {2} partecipate, {3} persone --------".format(
organizations_group['descr'], group_counter, len(owned_ids), len(people_ids)
))
else:
self.logger.info("[{0}]: skipping {1} tax_ids".format(
organizations_group['descr'], len(tax_ids_batch)
))
self.logger.debug("")
counter += len(tax_ids_batch)
self.logger.info(
"crediti spesi con atoka: {0} companies, {1} people".format(
atoka_companies_requests, atoka_people_requests
)
)
# produce the json file
self.logger.info("Dati scritti in {0}".format(jsonfile))
with open(jsonfile, "w") as f:
json.dump(atoka_records, f)
# -*- coding: utf-8 -*-
import os
from loaders import JsonLoader
from opdmetl import ETL
from taskmanager.utils import LoggingBaseCommand
from project.api_v1.etl.extractors import JsonArrayExtractor, ListExtractor
from project.api_v1.etl.transformations.atoka import \
AtokaOwnershipTransformation, \
AtokaMembershipTransformation, \
AtokaOrganizationTransformation
class Command(LoggingBaseCommand):
help = "Transform information from ATOKA, loading them from a JSON file, producing files for successive purposes"
def add_arguments(self, parser):
parser.add_argument(
"--json-source-file",
dest="json_src",
default="./resources/data/atoka/atoka.json",
help="Complete path to source file with atoka info"
)
parser.add_argument(
"--json-output-path",
dest="json_out_path",
default="./resources/data/atoka",
help="Path to directory where emitted json files will be stored"
)
def handle(self, *args, **options):
self.setup_logger(__name__, formatter_key='simple', **options)
json_src = options['json_src']
json_out_path = options['json_out_path']
self.logger.info("Start procedure")
atoka_records = JsonArrayExtractor(json_src).extract()
self.logger.info("organizations")
ETL(
extractor=ListExtractor(atoka_records),
transformation=AtokaOrganizationTransformation(),
loader=JsonLoader(os.path.join(json_out_path, "atoka_organizations.json")),
log_level=self.logger.level,
# source="http://api.atoka.it"
)()
self.logger.info("ownerships")
ETL(
extractor=ListExtractor(atoka_records),
transformation=AtokaOwnershipTransformation(),
loader=JsonLoader(os.path.join(json_out_path, "atoka_ownerships.json")),
log_level=self.logger.level,
# source="http://api.atoka.it"
)()
# update or create memberships taken from atoka_records
self.logger.info("memberships")
ETL(
extractor=ListExtractor(atoka_records),
transformation=AtokaMembershipTransformation(),
loader=JsonLoader(os.path.join(json_out_path, "atoka_person_memberships.json")),
log_level=self.logger.level,
# source="http://api.atoka.it"
)()
self.logger.info("Stop procedure")
......@@ -26,15 +26,23 @@ class Command(BaseCommand):
"--lookup-strategy",
dest="lookup_strategy",
default="mixed",
help="Whether to lookup using mixed (name and start date), "
"identifier (see --identifier-scheme) or mixed (identifier first, then cascade to anagraphical)",
help="Whether to lookup using anagraphical (name and start date), "
"identifier (see --identifier-scheme) or mixed (identifier first, then cascade to anagraphical). "
"Add _current suffix to only lookup into currently active organizations (ie: mixed_current)",
)
parser.add_argument(
"--identifier-scheme",
dest="identifier_scheme",
default="OCD-URI",
default=None,
help="Which scheme to use with identifier/mixed lookup strategy",
)
parser.add_argument(
"--log-step",
dest="log_step",
type=int,
default=500,
help="Number of steps to log process completion to stdout. Defaults to 500.",
)
def handle(self, *args, **options):
verbosity = options["verbosity"]
......@@ -51,6 +59,7 @@ class Command(BaseCommand):
lookup_strategy = options["lookup_strategy"]
identifier_scheme = options["identifier_scheme"]
source_url = options["source_url"]
log_step = options["log_step"]
self.logger.info("Start records import")
self.logger.info("Reading CSV from url: {0}".format(source_url))
......@@ -63,6 +72,7 @@ class Command(BaseCommand):
update_strategy=update_strategy,
lookup_strategy=lookup_strategy,
identifier_scheme=identifier_scheme,
log_step=log_step
),
log_level=self.logger.level,
)()
......
import logging
from opdmetl import ETL
from taskmanager.utils import LoggingBaseCommand
from project.api_v1.etl.loaders.organizations import PopoloOrgOwnershipLoader
from project.api_v1.etl.extractors import JsonArrayExtractor
class Command(LoggingBaseCommand):
help = "Import Ownerships relationships from a JSON source"
logger = logging.getLogger(__name__)
def add_arguments(self, parser):
parser.add_argument(
dest="source_uri", help="Source of the JSON file (http[s]:// or local path ./...)"
)
parser.add_argument(
dest="original_source",
help="Original source of the data, to add to sources attributes (ex: http://api.atoka.io)"
)
parser.add_argument(
"--update-strategy",
dest="update_strategy",
default="keep_old",
help="Whether to keep old values or to overwrite them (keep_old | overwrite), defaults to keep_old",
)
parser.add_argument(
"--lookup-strategy",
dest="lookup_strategy",
default="identifier",
help="Whether to lookup using mixed (name and start date), "
"identifier (see --identifier-scheme) or mixed (identifier first, then cascade to anagraphical)",
)
parser.add_argument(
"--identifier-scheme",
dest="identifier_scheme",
default=None,
help="Which scheme to use with identifier/mixed lookup strategy. None to use main identifier.",
)
parser.add_argument(
"--log-step",
dest="log_step",
type=int,
default=500,
help="Number of steps to log process completion to stdout. Defaults to 500.",
)
def handle(self, *args, **options):
self.setup_logger(__name__, formatter_key="simple", **options)
self.logger.info("Start ETL process")
update_strategy = options["update_strategy"]
lookup_strategy = options["lookup_strategy"]
identifier_scheme = options["identifier_scheme"]
source_uri = options["source_uri"]
original_source = options["original_source"]
log_step = options["log_step"]
self.logger.info("Extraction from JSON")
ETL(
extractor=JsonArrayExtractor(source_uri),
loader=PopoloOrgOwnershipLoader(
update_strategy=update_strategy,
lookup_strategy=lookup_strategy,
identifier_scheme=identifier_scheme,
log_step=log_step
),
log_level=self.logger.level,
source=original_source
)()
self.logger.info("End")
import csv
from io import StringIO
import logging
import requests
from popolo.models import Classification, Organization
from taskmanager.utils import LoggingBaseCommand
class Command(LoggingBaseCommand):
help = "Corrections for Classifications of Organizations"
logger = logging.getLogger(__name__)
def handle(self, *args, **options):
verbosity = options["verbosity"]
if verbosity == 0:
self.logger.setLevel(logging.ERROR)
elif verbosity == 1:
self.logger.setLevel(logging.WARNING)
elif verbosity == 2:
self.logger.setLevel(logging.INFO)
elif verbosity == 3:
self.logger.setLevel(logging.DEBUG)
self.setup_logger(__name__, formatter_key="simple", **options)
self.logger.info("Start")
# add unknown classifications
Classification.objects.get_or_create(
scheme='FORMA_GIURIDICA_OP',
code='3710',
descr='Forza armata o di ordine pubblico e sicurezza'
)
Classification.objects.filter(
scheme="FORMA_GIURIDICA_OP", code="2830"
).update(descr="Agenzie, enti e consorzi")
definizioni = "fonte;classificazione;forma_giuridica_op;occorrenze\r\n" \
"TIPOLOGIA_SIOPE_BDAP;CAMERE DI COMMERCIO;Camera di commercio\r\n" \
"CATEGORIA_IPA_BDAP;ISTITUZIONI PER L'ALTA FORMAZIONE ARTISTICA, MUSICALE E COREUTICA - AFAM;" \
"Istituto e scuola pubblica di ogni ordine e grado\r\n" \
"CATEGORIA_IPA_BDAP;UNIVERSITA' E ISTITUTI DI ISTRUZIONE UNIVERSITARIA PUBBLICI;Università pubblica\r\n" \
"TIPOLOGIA_DT_BDAP;SOCIETA' CONSORTILE;Società consortile\r\n" \
"TIPOLOGIA_DT_BDAP;SOCIETA' COOPERATIVA;Società cooperativa diversa\r\n" \
"TIPOLOGIA_DT_BDAP;ENTE PUBBLICO ECONOMICO;Ente pubblico economico\r\n" \
"TIPOLOGIA_DT_BDAP;ENTE PUBBLICO NON ECONOMICO;Altro ente pubblico non economico\r\n" \
"TIPOLOGIA_DT_BDAP;AZIENDA SPECIALE E DI ENTE LOCALE;Azienda speciale ai sensi del t.u. 267/2000\r\n" \
"CATEGORIA_IPA_BDAP;ENTI DI REGOLAZIONE DEI SERVIZI IDRICI E O DEI RIFIUTI;" \
"Altro ente pubblico non economico\r\n" \
"TIPOLOGIA_DT_BDAP;CONSORZIO;Consorzio di diritto pubblico\r\n" \
"CATEGORIA_IPA_BDAP;ENTI PUBBLICI PRODUTTORI DI SERVIZI ASSISTENZIALI, RICREATIVI E CULTURALI;" \
"Altro ente pubblico non economico\r\n" \
"CATEGORIA_IPA_BDAP;AZIENDE E CONSORZI PUBBLICI TERRITORIALI PER L'EDILIZIA RESIDENZIALE;" \
"Consorzio di diritto pubblico\r\n" \
"CATEGORIA_IPA_BDAP;ALTRI ENTI LOCALI;Ente di sviluppo agricolo regionale o di altro ente locale\r\n" \
"CATEGORIA_IPA_BDAP;COMUNI E LORO CONSORZI E ASSOCIAZIONI;Agenzie, enti e consorzi\r\n" \
"CATEGORIA_IPA_BDAP;UNIONI DI COMUNI E LORO CONSORZI E ASSOCIAZIONI;Agenzie, enti e consorzi\r\n" \
"CATEGORIA_IPA_BDAP;ALTRI ENTI LOCALI;Agenzie, enti e consorzi\r\n" \
"CATEGORIA_IPA_BDAP;ENTI E ISTITUZIONI DI RICERCA PUBBLICI;Istituto o ente pubblico di ricerca\r\n" \
"CATEGORIA_IPA_BDAP;AGENZIE ED ENTI REGIONALI PER LA FORMAZIONE, LA RICERCA E L'AMBIENTE;" \
"Ente ambientale regionale\r\n" \
"CATEGORIA_IPA_BDAP;CONSORZI DI BACINO IMBRIFERO MONTANO;Consorzio di diritto pubblico\r\n" \
"CATEGORIA_IPA_BDAP;CONSORZI INTERUNIVERSITARI DI RICERCA;Consorzio di diritto pubblico\r\n" \
"CATEGORIA_IPA_BDAP;PRESIDENZA DEL CONSIGLIO DEI MINISTRI, MINISTERI E AVVOCATURA DELLO STATO;" \
"Organo costituzionale o a rilevanza costituzionale\r\n" \
"CATEGORIA_IPA_BDAP;PROVINCE E LORO CONSORZI E ASSOCIAZIONI;Agenzie, enti e consorzi\r\n" \
"CATEGORIA_IPA_BDAP;COMUNITA' MONTANE E LORO CONSORZI E ASSOCIAZIONI;Agenzie, enti e consorzi\r\n" \
"CATEGORIA_IPA_BDAP;FORZE DI POLIZIA AD ORDINAMENTO CIVILE E MILITARE PER LA " \
"TUTELA DELL'ORDINE E DELLA SICUREZZA PUBBLICA;Forza armata o di ordine pubblico e sicurezza\r\n" \
"TIPOLOGIA_DT_BDAP;ENTE DI DIRITTO PUBBLICO;Ente pubblico economico\r\n" \
"TIPOLOGIA_DT_BDAP;ASSOCIAZIONI E FONDAZIONI;Associazione riconosciuta\r\n" \
"CATEGORIA_IPA_BDAP;REGIONI, PROVINCE AUTONOME E LORO CONSORZI E ASSOCIAZIONI;" \
"Agenzie, enti e consorzi\r\n" \
"CATEGORIA_IPA_BDAP;ENTI NAZIONALI DI PREVIDENZA ED ASSISTENZA SOCIALE IN CONTO ECONOMICO CONSOLIDATO;" \
"Altro ente pubblico non economico\r\n" \
"TIPOLOGIA_DT_BDAP;ALTRO;Altra forma giuridica\r\n"
reader = csv.DictReader(StringIO(definizioni), delimiter=';')
for row in reader:
self.logger.info("{0} => {1}".format(row["classificazione"], row["forma_giuridica_op"]))
c = Classification.objects.get(scheme="FORMA_GIURIDICA_OP", descr=row["forma_giuridica_op"])
for o in Organization.objects.filter(
classifications__classification__scheme=row["fonte"],
classifications__classification__descr__iexact=row["classificazione"],
classification__isnull=True
):
o.add_classification_rel(c)
o.classification = c.descr
o.save()
r = requests.get(
"https://s3.eu-central-1.amazonaws.com/opdm-service-data/unclassified_classifications.csv"
)
reader = csv.DictReader(StringIO(r.content.decode("utf8")), delimiter=';')
for row in reader:
self.logger.info("{0} => {1}".format(row["name"], row["classification"]))
try:
c = Classification.objects.get(
scheme="FORMA_GIURIDICA_OP", descr__iexact=row["classification"]
)
except Classification.DoesNotExist:
self.logger.error("Could not find classification for {0}".format(row["classification"]))
continue
o_id = int(row["id"].split("/")[-1])
o = Organization.objects.get(id=o_id)
o.add_classification_rel(c)
o.classification = c.descr
o.save()
self.logger.info("End")
import csv
import logging
import os
from itertools import groupby
from os.path import abspath
from django.core.management import BaseCommand
from popolo.models import Organization
from project.api_v1.core import batch_generator
from project.connections.atoka import AtokaConn, AtokaObjectDoesNotExist
class Command(BaseCommand):
help = (
"Extract all roles from ATOKA, for companies owned by PA institutions"
)
logger = logging.getLogger(__name__)
persons_update_strategy = None
memberships_update_strategy = None
def add_arguments(self, parser):
parser.add_argument(
"--batch-size",
dest="batchsize", type=int,
default=25,
help="Size of the batch of organizations processed at once",
)
parser.add_argument(
"--offset",
dest="offset", type=int,
default=0,
help="Start processing",
)
parser.add_argument(
"--out-path",
dest="outpath",
default="./resources/data/out"
)
def handle(self, *args, **options):
verbosity = options["verbosity"]
if verbosity == 0:
self.logger.setLevel(logging.ERROR)
elif verbosity == 1:
self.logger.setLevel(logging.WARNING)
elif verbosity == 2:
self.logger.setLevel(logging.INFO)
elif verbosity == 3:
self.logger.setLevel(logging.DEBUG)
batchsize = options['batchsize']
offset = options['offset']
outpath = options['outpath']
self.logger.info("Start")
# start filtering current organizations with a tax_id,
# excluding those classified as private
organizations_qs = Organization.objects.filter(
classifications__classification__scheme='FORMA_GIURIDICA_OP'
).current().exclude(
classifications__classification_id__in=[
11, 20, 24, 29, 48, 69, 83, 295, 321, 346, 403, 621, 941, 730, 1182, 1183, 1184, 1185, 1186, 1187,
1188, 1190, 1189, 1191, 1192, 1193, 1194, 1195, 1196, 1197, 1198, 1199, 1200, 1201, 1202
]
).filter(identifier__isnull=False).exclude(
classifications__classification__descr__in=[]
).distinct()
# group organizations by classification counting occurrences
organizations_groups = [
{
'descr': x['classifications__classification__descr'],
'id': x['classifications__classification_id'],
'n': 0
}
for x in list(organizations_qs.values(
'classifications__classification__descr',
'classifications__classification_id'
).distinct())
]
for group in organizations_groups:
group['n'] = organizations_qs.filter(classifications__classification_id=group['id']).count()
organizations_groups = sorted(organizations_groups, key=lambda x: x['n'] * -1)
atoka_conn = AtokaConn()
atoka_roles = atoka_conn.allowed_roles
counter = 0
owned_ids = []
owned_orgs_dict = {}
people_ids = []
people = []
res_doubles = {}
atoka_companies_requests = 0
atoka_people_requests = 0
for organizations_group in organizations_groups:
self.logger.info('processing {0} institutions classified as {1}'.format(
organizations_group['n'], organizations_group['descr']
))
# generate batches of batchsize, to query atoka's endpoint
batches = batch_generator(
batchsize, organizations_qs.filter(
classifications__classification_id=organizations_group['id']
).values_list('identifier', flat=True).distinct().iterator()
)
group_owned_ids = []
group_people_ids = []
group_owned_orgs_dict = {}
group_counter = 0
for tax_ids in batches:
# extract atoka_ownerships into a list in memory,
# in order to use it in the two following ETL procedures
batch_owned_ids = []
batch_counter = 0
# implement offset
if counter >= offset:
# fetch all companies among the list having govType values set
try:
res_tot = atoka_conn.get_companies_from_tax_ids(
tax_ids, packages='base,shares', active='true', batch_size=1,
)
self.logger.info(
"- from {0} tax_ids, fetched info and shares for {1} institutions".format(
len(tax_ids), len(res_tot)
)
)
atoka_companies_requests += len(res_tot)
except AtokaObjectDoesNotExist:
res_tot = []
# build list of atoka_ids per tax_id
for r in res_tot:
if r['base']['taxId'] not in res_doubles:
res_doubles[r['base']['taxId']] = []
res_doubles[r['base']['taxId']].append(r['id'])
# remove owners with no shares
res_tot = list(filter(lambda x: 'shares' in x and 'sharesOwned' in x['shares'], res_tot))
for r in res_tot:
r_owned_ids = [
sho['id']
for sho in filter(
lambda x: x['active'] is True and x['typeOfRight'] == 'proprietà' and 'ratio' in x,
r['shares']['sharesOwned']
)
]
if len(r_owned_ids) == 0:
continue
batch_owned_ids.extend(r_owned_ids)
group_owned_ids.extend(r_owned_ids)
owned_ids.extend(r_owned_ids)
batch_counter += 1
group_counter += 1
counter += 1
owned_orgs = atoka_conn.get_companies_from_atoka_ids(
batch_owned_ids, packages='base', active='true', batch_size=1
)
self.logger.debug("- fetched details for {0} owned companies".format(len(owned_orgs)))
atoka_companies_requests += len(owned_orgs)
for o in owned_orgs:
d = {
'legal_form': [x['name'] for x in o['base']['legalForms'] if x['level'] == 2][0]
}
group_owned_orgs_dict[o['id']] = d
owned_orgs_dict[o['id']] = d
batch_people = atoka_conn.get_roles_from_atoka_ids(
batch_owned_ids, packages='base,companies',
companiesRolesOfficial='true', companiesRoles=atoka_roles
)
self.logger.info("- fetched details and roles for {0} people".format(len(batch_people)))
atoka_people_requests += len(batch_people)
# append people, if not already there
batch_people_ids = []
for person in batch_people:
if person['id'] not in people_ids:
people.append(person)
batch_people_ids.append(person['id'])
group_people_ids.extend(batch_people_ids)
people_ids.extend(batch_people_ids)
self.logger.info("Group {0}: {1} institutions, {2} owned_orgs, {3} companies, {4} people.".format(
organizations_group['descr'], group_counter, len(group_owned_ids),
atoka_companies_requests, atoka_people_requests
))
group_owned_ids = list(set(group_owned_ids))
group_people_ids = list(set(group_people_ids))
self.logger.info("")
msg = "{0} statistics".format(organizations_group['descr'])
self.logger.info(msg)
self.logger.info("="*len(msg))
self.logger.info("{0} institutions searched".format(organizations_group['n']))
self.logger.info("{0} institutions found with shares and processed".format(group_counter))
self.logger.info("{0} active owned organizations".format(len(group_owned_ids)))
self.logger.info("{0} official people in roles".format(len(group_people_ids)))
self.logger.info("")
owned_ids = list(set(owned_ids))
people_ids = list(set(people_ids))
self.logger.info("")
self.logger.info("Total statistics")
self.logger.info("================")
self.logger.info("{0} institutions searched".format(organizations_qs.values('identifier').distinct().count()))
self.logger.info("{0} institutions found with shares and processed".format(counter))
self.logger.info("{0} active owned organizations".format(len(owned_ids)))
self.logger.info("{0} official people in roles".format(len(people_ids)))
# extract roles from people, filtering only allowed roles in owned companiew
roles = []
for person in filter(lambda y: 'base' in y and 'gender' in y['base'], people):
for company in filter(lambda y: 'id' in y and y['id'] in owned_ids, person['companies']['items']):
for role in filter(lambda y: 'official' in y, company['roles']):
roles.append(
(role.get('name', '-').lower(), owned_orgs_dict[company['id']]['legal_form'].lower())
)
roles_frequency = {
key: len(list(group)) for key, group in groupby(sorted(roles))
}
roles = sorted(roles_frequency.items(), key=lambda x: x[1] * -1)
with open(os.path.join(outpath, "roles.csv"), "w") as f_roles:
roles_writer = csv.DictWriter(
f_roles, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL,
fieldnames=['name', 'legal_form', 'frequency']
)
roles_writer.writeheader()
for role in roles:
roles_writer.writerow({
'name': role[0][0],
'legal_form': role[0][1],
'frequency': role[1]
})
self.logger.info("Roles written to {0}".format(abspath(f_roles.name)))
res_doubles = {
k: v
for k, v in res_doubles.items() if len(v) > 1
}
with open(os.path.join(outpath, "doubles.csv"), "w") as f_doubles:
doubles_writer = csv.DictWriter(
f_doubles, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL,
fieldnames=['tax_id', 'n', 'atoka_ids']
)
doubles_writer.writeheader()
for k, v in res_doubles.items():
doubles_writer.writerow({
'tax_id': k,
'n': len(v),
'atoka_ids': ",".join(v),
})
self.logger.info("Doubles written to {0}".format(abspath(f_doubles.name)))
self.logger.info("End")
import logging
import pandas as pd
import numpy as np
from django.core.management import BaseCommand
from popolo.models import Classification, RoleType
class Command(BaseCommand):
help = (
"Generate opdm RoleTypes, starting from a CSV file with atoka->opdm mapping"
)
logger = logging.getLogger(__name__)
def add_arguments(self, parser):
parser.add_argument(
"--csv-file",
dest="csvfile",
default="./resources/data/role_types_atoka_opdm.csv",
help="Complete csv filename",
)
def handle(self, *args, **options):
verbosity = options["verbosity"]
if verbosity == 0:
self.logger.setLevel(logging.ERROR)
elif verbosity == 1:
self.logger.setLevel(logging.WARNING)
elif verbosity == 2:
self.logger.setLevel(logging.INFO)
elif verbosity == 3:
self.logger.setLevel(logging.DEBUG)
csvfile = options['csvfile']
self.logger.info("Start")
df = pd.read_csv(csvfile, delimiter=";", dtype=object)
classifications_mapping = None
for n, row in enumerate(df.to_dict('records')):
if n == 0:
classifications_mapping = row
else:
for k, v in classifications_mapping.items():
if v is np.nan:
continue
v = int(v)
if row[k] and v > 0:
rt_label = "{0} {1}".format(
row['role_type_opdm'],
Classification.objects.get(id=v).descr.lower()
)
rt, created = RoleType.objects.get_or_create(
label=rt_label,
classification_id=v
)
if created:
print(rt)
self.logger.info("End")
import logging
from django.db.models import Count
from popolo.models import Organization
from taskmanager.utils import LoggingBaseCommand
class Command(LoggingBaseCommand):
help = "Merge duplicate organizations, removing those created sooner"
logger = logging.getLogger(__name__)
def handle(self, *args, **options):
verbosity = options["verbosity"]
if verbosity == 0:
self.logger.setLevel(logging.ERROR)
elif verbosity == 1:
self.logger.setLevel(logging.WARNING)
elif verbosity == 2:
self.logger.setLevel(logging.INFO)
elif verbosity == 3:
self.logger.setLevel(logging.DEBUG)
self.setup_logger(__name__, formatter_key="simple", **options)
self.logger.info("Start")
double_identifiers = Organization.objects.filter(dissolution_date__isnull=True).values('identifier').annotate(
n=Count('identifier')).order_by('-n').filter(n__gt=1).values_list('identifier', flat=True)
for identifier in double_identifiers:
orgs = Organization.objects.filter(
identifier=identifier, dissolution_date__isnull=True
).order_by("created_at")
self.logger.info(
"{0} => {1}".format(identifier, ", ".join(["{0} ({1})".format(org.id, org.created_at) for org in orgs]))
)
o0 = orgs[0]
o1 = orgs[1]
if o1.name != o0.name:
o1.add_other_name(o0.name)
for c in o0.contact_details.values():
c.pop("id")
try:
r = o1.add_contact_detail(**c)
if r:
self.logger.info(" {0} added".format(r))
except Exception as e:
self.logger.error(" Errore: {0}".format(e))
for i in o0.identifiers.values():
i.pop("id")
try:
r = o1.add_identifier(**i)
if r:
self.logger.info(" {0} added".format(r))
except Exception as e:
self.logger.error(" Errore: {0}".format(e))
for c in o0.classifications.values():
try:
r = o1.add_classification_rel(c["classification_id"])
if r:
self.logger.info(" {0} added".format(r))
except Exception as e:
self.logger.error(" Errore: {0}".format(e))
for i in o0.links.values("link__url", "link__note"):
try:
r = o1.add_link(
url=i.get("link__url"), note=i.get("link__note", "")
)
if r:
self.logger.info("{0} added".format(r))
except Exception as e:
self.logger.error("Errore: {0}".format(e))
for i in o0.sources.values("source__url", "source__note"):
try:
r = o1.add_source(
url=i.get("source__url"), note=i.get("source__note", "")
)
if r:
self.logger.info("{0} added".format(r))
except Exception as e:
self.logger.error("Errore: {0}".format(e))
# remove duplicates after info were copied
o0.delete()
self.logger.info("End")
This diff is collapsed.
from unittest.mock import MagicMock
from popolo.models import Organization, Ownership, Membership
from opdmetl import ETL
from tests.factories import OrganizationFactory, ClassificationFactory
from project.api_v1.etl.extractors import AtokaOwnershipsExtractor, ListExtractor
from project.api_v1.etl.loaders.organizations import PopoloOrgLoader, PopoloOrgOwnershipLoader
from project.api_v1.etl.transformations.atoka import AtokaOwnershipOrgTransformation, AtokaOwnershipTransformation, \
AtokaMembershipTransformation
from project.api_v1.tests.etl import SolrETLTest
from project.api_v1.tests.etl.atoka_mocks import get_companies_tax_ids_batch, get_companies_atoka_ids_batch, \
get_roles_atoka_ids_batch
from project.api_v1.tests import faker
from project.connections.atoka import AtokaConn
class AtokaETLTest(SolrETLTest):
"""
"""
@classmethod
def setUpClass(cls):
super(AtokaETLTest, cls).setUpClass()
# # need to st
# getattr(self, 'mock_get_patcher').stop()
# getattr(self, 'mock_post_patcher').stop()
# mock atoka_conn method for cciaa or govTypes results
AtokaConn.get_companies_from_tax_ids = MagicMock(
side_effect=get_companies_tax_ids_batch,
status_code=200,
ok=True
)
AtokaConn.get_companies_from_atoka_ids = MagicMock(
side_effect=get_companies_atoka_ids_batch,
status_code=200,
ok=True
)
AtokaConn.get_roles_from_atoka_ids = MagicMock(
side_effect=get_roles_atoka_ids_batch,
status_code=200,
ok=True
)
@classmethod
def tearDownClass(cls):
super(AtokaETLTest, cls).tearDownClass()
def test_extractor(self):
"""Test exctraction of ownerships works correctly
"""
res = AtokaOwnershipsExtractor(
['02438750586', '00008010803', '00031500945', '00031730948', '00033120437', '00034670943']
).extract()
self.assertEqual(type(res), list)
self.assertEqual(len(res), 6)
c = res[next(i for i, v in enumerate(res) if v['tax_id'] == '00008010803')]
self.assertEqual(c['atoka_id'], 'b248111d6667')
self.assertEqual(len(c['other_atoka_ids']), 0)
self.assertEqual(c['name'], "COMUNE DI CINQUEFRONDI")
self.assertEqual(c['legal_form'], 'Soggetto non iscritto al Registro Imprese')
self.assertEqual(c['rea'], None)
self.assertEqual(c['cciaa'], None)
self.assertEqual('shares_owned' in c, True)
self.assertEqual(len(c['shares_owned']), 1)
sho = c['shares_owned'][0]
self.assertEqual(sho['percentage'], 12.5)
self.assertEqual(sho['atoka_id'], 'c17fceac95f3')
self.assertEqual(sho['last_updated'], "2005-05-20")
self.assertEqual(sho['founded'], "2004-07-14")
self.assertEqual(sho['name'], "CONSORZIO FORESTALE ASPRO-SERRE")
self.assertEqual(sho['rea'], "159715")
self.assertEqual(sho['cciaa'], "RC")
self.assertEqual(sho['tax_id'], "91009990804")
self.assertEqual(sho['legal_form'], "Consorzio")
self.assertEqual('roles' in sho, True)
# test with Rome
c = res[next(i for i, v in enumerate(res) if v['tax_id'] == '02438750586')]
self.assertEqual(len(c['other_atoka_ids']), 1)
self.assertEqual(len(c['shares_owned']), 6)
def test_create_or_update_organizations_and_ownerships(self):
"""Tests that ownerships among orgnizations extracted from ATOKA (owning and owned)
are created anew or updated when existing.
Organizations are also created or updated.
:return:
"""
owning = OrganizationFactory.create(identifier="02438750586", name="Comune di Roma")
for cl_id in [321, 11, 24, 295]:
ClassificationFactory.create(id=cl_id, descr=faker.word())
atoka_ownerships = AtokaOwnershipsExtractor(
['02438750586', '00008010803', '00031500945',
'00031730948', '00033120437', '00034670943']
).extract()
ETL(
extractor=ListExtractor(atoka_ownerships),
transformation=AtokaOwnershipOrgTransformation(),
loader=PopoloOrgLoader(lookup_strategy='identifier', identifier_scheme=None),
log_level=0,
)()
self.assertEqual(Organization.objects.count() > 1, True)
self.assertEqual(owning.name, "Comune di Roma")
self.assertNotEqual(
owning.classifications.filter(classification__scheme='CCIAA').count(), 0
)
ETL(
extractor=ListExtractor(atoka_ownerships),
transformation=AtokaOwnershipTransformation(),
loader=PopoloOrgOwnershipLoader(lookup_strategy='identifier', identifier_scheme=None),
log_level=0,
)()
self.assertEqual(Ownership.objects.count() > 1, True)
def test_create_or_update_organizations_and_memberships(self):
"""Tests that memberships in orgnizations extracted from ATOKA (owning and owned)
are created anew or updated when existing.
Organizations are also created or updated.
:return:
"""
owning = OrganizationFactory.create(identifier="02438750586", name="Comune di Roma")
for cl_id in [321, 11, 24, 295]:
ClassificationFactory.create(id=cl_id, descr=faker.word())
atoka_ownerships = AtokaOwnershipsExtractor(
['02438750586', '00008010803', '00031500945',
'00031730948', '00033120437', '00034670943']
).extract()
ETL(
extractor=ListExtractor(atoka_ownerships),
transformation=AtokaOwnershipOrgTransformation(),
loader=PopoloOrgLoader(lookup_strategy='identifier', identifier_scheme=None),
log_level=0,
)()
self.assertEqual(Organization.objects.count() > 1, True)
self.assertEqual(owning.name, "Comune di Roma")
self.assertNotEqual(
owning.classifications.filter(classification__scheme='CCIAA').count(), 0
)
ETL(
extractor=ListExtractor(atoka_ownerships),
transformation=AtokaMembershipTransformation(),
loader=PopoloOrgOwnershipLoader(lookup_strategy='identifier', identifier_scheme=None),
log_level=0,
)()
self.assertEqual(Membership.objects.count() > 1, True)
import itertools
import json
from io import StringIO
import requests
from django.conf import settings
from popolo.models import Person
from requests_toolbelt import MultipartEncoder
class AtokaConn(object):
......@@ -15,6 +18,14 @@ class AtokaConn(object):
version = settings.ATOKA_API_VERSION
key = settings.ATOKA_API_KEY
allowed_roles = \
"titolare firmatario,amministratore unico,consigliere,socio amministratore,socio accomandante," \
"socio,socio accomandatario,presidente consiglio amministrazione,socio unico,amministratore,titolare," \
"sindaco effettivo,vice presidente consiglio amministrazione,amministratore delegato,liquidatore," \
"sindaco supplente,socio di societa' in nome collettivo,consigliere delegato,presidente," \
"curatore fallimentare,presidente del collegio sindacale,vice presidente,legale rappresentante," \
"revisore dei conti,legale rappresentante di societa',institore,direttore generale"
def get_person_from_tax_id(self, tax_id: str) -> dict:
"""get a single person from ATOKA API, from its tax_id
raise Atoka exceptions if errors or no objects found
......@@ -87,40 +98,105 @@ class AtokaConn(object):
return result['items'][0]
def get_companies_from_tax_ids(self, tax_ids: list, **kwargs) -> list:
"""get all companies from ATOKA API, from given tax_ids list
raise Atoka exceptions if errors or no objects found
:param tax_ids: - the list of tax_ids to extract info from
:param kwargs: - more atoka parameters for filtering results (ex: active='true', ccia='*')
:return: dict - ATOKA result
def get_items_from_ids(
self, ids: list, item_type: str, ids_field_name: str = 'ids', batch_size: int = 50, **kwargs
) -> list:
"""Transform a request for a list of ids larger than batch_size,
to a batch request of enough rows with a limit of batch_size, so that all results
can be returned.
Results are composed and returned as a list of dicts.
:param ids: list
:param item_type: str
:param ids_field_name: ids, tax_ids
:param batch_size: size of the number of ids searched by row of the batch IO
:param kwargs: - more atoka parameters for filtering results
(ex: packages=base,shares, active='true', ccia='*')
:return: results as a list of dicts
"""
kwargs.pop('taxIds', False) # taxIds are read only from parameter
packages = kwargs.pop('packages', 'base')
req_params = {
'token': self.key,
'taxIds': ",".join(map(str, tax_ids)),
'packages': packages,
'limit': 50
if ids_field_name not in ['ids', 'taxIds', 'companies']:
raise AtokaException("ids_field_name parameter must take one of these values: <ids>, <taxIds>, <companies>")
if batch_size < 1 or batch_size > 50:
raise AtokaException("batch_size must be between 1 and 50")
if item_type not in ['companies', 'people']:
raise AtokaException("item_type must take one of these values: <companies>, <people>")
if len(ids) == 0:
return []
api_endpoint = "{0}/{1}/{2}/".format(
self.service_url, self.version, item_type
)
# internal function to split ids list into chunks
def chunks(lst, size):
"""Yield successive size-sized chunks from lst."""
for i in range(0, len(lst), size):
yield lst[i:i + size]
# build fileIO to upload form batch execution
file_io = StringIO()
for n, r in enumerate(chunks(ids, batch_size)):
print(json.dumps({
"reqId": "r{0:05d}".format(n),
ids_field_name: ','.join(r),
}), file=file_io)
# batch API request
fields = {
'batch': ('batch.json', file_io),
'limit': '50'
}
req_params.update(kwargs)
fields.update(kwargs)
m = MultipartEncoder(
fields=fields
)
response = requests.get(
'{0}/{1}/companies'.format(
self.service_url, self.version
),
params=req_params,
response = requests.post(
api_endpoint,
params={'token': self.key},
data=m,
headers={'Content-Type': m.content_type}
)
# destroy fileIO
file_io.close()
if response is None:
return []
# return response
json_response = response.json()
if not response.ok:
raise AtokaResponseError(response.reason)
result = response.json()
if result['meta']['count'] == 0:
raise AtokaObjectDoesNotExist(
"Could not find results for this request."
)
total_response = []
if 'responses' in json_response:
for r in json_response['responses'].values():
total_response.extend(r['items'])
else:
total_response.extend(json_response['items'])
if len(total_response) == 0:
raise AtokaObjectDoesNotExist(getattr(response, "content", None))
return total_response
def get_companies_from_tax_ids(self, tax_ids: list, **kwargs) -> list:
"""get all companies from ATOKA API, from given tax_ids list
raise Atoka exceptions if errors or no objects found
return result['items']
:param tax_ids: - the list of tax_ids to extract info from
:param kwargs: - more atoka parameters for filtering results (ex: active='true', ccia='*')
:return: dict - ATOKA result
"""
return self.get_items_from_ids(tax_ids, 'companies', ids_field_name='taxIds', **kwargs)
def get_companies_from_atoka_ids(self, atoka_ids: list, **kwargs) -> list:
"""get all companies from ATOKA API, from given atoka_ids list
......@@ -130,170 +206,38 @@ class AtokaConn(object):
:param kwargs: - more atoka parameters for filtering results (ex: active='true', ccia='*')
:return: dict - ATOKA result
"""
kwargs.pop('ids', False) # ids are read only from parameter
packages = kwargs.pop('packages', 'base')
req_params = {
'token': self.key,
'ids': ",".join(map(str, atoka_ids)),
'packages': packages,
'limit': 50
}
req_params.update(kwargs)
response = requests.get(
'{0}/{1}/companies'.format(
self.service_url, self.version
),
params=req_params,
)
if not response.ok:
raise AtokaResponseError(response.reason)
return self.get_items_from_ids(atoka_ids, 'companies', ids_field_name='ids', **kwargs)
result = response.json()
if result['meta']['count'] == 0:
raise AtokaObjectDoesNotExist(
"Could not find results for this request."
)
def get_people_from_tax_ids(self, tax_ids: list, **kwargs) -> list:
"""get all people from ATOKA API, from given tax_ids list
raise Atoka exceptions if errors or no objects found
return result['items']
def extract_ownerships_from_tax_ids(self, tax_ids: list) -> list:
"""Given a batch of tax_ids, queries atoka, in order to retrieve all shares.
Ownerships are returned as a dict. The keys are the tax_ids of the owner.
Other details of the owner are stored along at the same level, then the `shares` key
contains all ownerships details.
[
{
'atoka_id': owner_atoka_id,
'tax_id': owner_tax_id,
'other_atoka_ids': [ ... ],
'name': owner_name,
'legal_form': owner_atoka_legal_form_level2,
'rea': owner_rea_id,
'cciaa': owner_cciaa,
'shares_owned': [
{
'name': owned_name,
'founded': owned_founded,
'atoka_id': owned_atoka_id,
'tax_id': owned_tax_id,
'rea': owner_rea_id,
'cciaa': owner_cciaa,
'legal_form': owned_atoka_legal_form_level2,
'ateco': owned_ateco,
'percentage': ratio * 100,
'last_update': share_last_update
},
...
]
},
...
]
:param tax_ids: the batch of tax ids
:return: a dict with ownerships details
:param tax_ids: - the list of tax_ids to extract info from
:param kwargs: - more atoka parameters for filtering results (ex: active='true', ccia='*')
:return: dict - ATOKA result
"""
return self.get_items_from_ids(tax_ids, 'people', ids_field_name='taxIds', **kwargs)
# fetch all companies among the batch having govType values set
try:
res_gov = self.get_companies_from_tax_ids(
tax_ids, packages='base,shares', active='true', govTypes='*'
)
except AtokaObjectDoesNotExist:
res_gov = []
# fetch all companies among the batch having cciaa values set
try:
res_cc = self.get_companies_from_tax_ids(
tax_ids, packages='base,shares', active='true', cciaa='*'
)
except AtokaObjectDoesNotExist:
res_cc = []
# unite the results in a single list, removing duplicates
res_tot = res_gov + list(filter(lambda x: x['id'] not in [i['id'] for i in res_gov], res_cc))
# return multiple results for single tax_ids, if any
res_doubles = {}
for r in res_tot:
if r['base']['taxId'] not in res_doubles:
res_doubles[r['base']['taxId']] = []
res_doubles[r['base']['taxId']].append(r['id'])
res_doubles = {
k: v
for k, v in res_doubles.items() if len(v) > 1
}
# remove owners with no shares
res_tot = list(filter(lambda x: 'shares' in x, res_tot))
# transform the results into a dict
res_dict = {
r['base']['taxId']: {
'atoka_id': r['id'],
'other_atoka_ids': [
atoka_id for atoka_id in res_doubles[r['base']['taxId']] if atoka_id != r['id']
] if r['base']['taxId'] in res_doubles else [],
'name': r['name'],
'legal_form': [x['name'] for x in r['base']['legalForms'] if x['level'] == 2][0],
'rea': r['base'].get('rea', None),
'cciaa': r['base'].get('cciaa', None),
'shares_owned': [
{
'name': sho['name'],
'last_updated': sho['lastUpdate'],
'atoka_id': sho['id'],
'percentage': sho['ratio'] * 100.
}
for sho in filter(
lambda x: x['active'] is True and x['typeOfRight'] == 'proprietà' and 'ratio' in x,
r['shares']['sharesOwned']
)
]
} for r in res_tot
}
# extract all atoka_ids from shares_owned elements and returns flat list
# then apply list(set(x)) to remove duplicates, if any
owned_atoka_ids = list(set(list(itertools.chain.from_iterable([
[x['atoka_id'] for x in r['shares_owned']]
for r in res_dict.values()
]))))
def get_people_from_atoka_ids(self, atoka_ids: list, **kwargs) -> list:
"""get all people from ATOKA API, from given atoka_ids list
raise Atoka exceptions if errors or no objects found
print("*** - richiesti {0} dettagli aziendali".format(len(owned_atoka_ids)))
:param atoka_ids: - the list of ids to extract info from
:param kwargs: - more atoka parameters for filtering results (ex: active='true', ccia='*')
:return: dict - ATOKA result
"""
return self.get_items_from_ids(atoka_ids, 'people', ids_field_name='ids', **kwargs)
res_owned = self.get_companies_from_atoka_ids(
owned_atoka_ids, packages='base', active='true'
)
res_owned_dict = {
r['id']: {
'name': r['name'],
'cciaa': r['base'].get('cciaa', None),
'rea': r['base'].get('rea', None),
'tax_id': r['base'].get('taxId', None),
'vat': r['base'].get('vat', None),
'founded': r['base'].get('founded', None),
'legal_form': [x['name'] for x in r['base']['legalForms'] if x['level'] == 2][0],
} for r in res_owned
}
def get_roles_from_atoka_ids(self, atoka_ids: list, **kwargs) -> list:
"""get all people in given companies, used to extract roles
# upgrade res_dict values with details values
for tax_id, owner in res_dict.items():
for owned in owner['shares_owned']:
owned_details = res_owned_dict.get(owned['atoka_id'], None)
if owned_details:
for f in ['name', 'cciaa', 'rea', 'tax_id', 'vat', 'founded', 'legal_form']:
owned[f] = owned_details[f]
else:
print("!!! - {0} richiesto ad atoka, ma non presente nei risultati".format(owned['atoka_id']))
# returns a list
results = []
for tax_id, result in res_dict.items():
result['tax_id'] = tax_id
results.append(result)
return results
:param atoka_ids:
:param kwargs:
:return:
"""
# need a batch_size of 1 because the number of people in a single company can be great,
# and 50 is the maximum limit for a single batch row request
return self.get_items_from_ids(atoka_ids, 'people', ids_field_name='companies', batch_size=1, **kwargs)
class AtokaException(Exception):
......
This diff is collapsed.