Skip to content
Commits on Source (3)
......@@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]
## [1.1.0]
### Fixed
- unofficial roles are not imported from ATOKA anymore
### Added
- project.atoka module added, to handle all atoka-related logic and tasks
- OrganizationEconomics model added to atoka module
### Changed
- all atoka-related code moved into project.atoka module
## [1.0.3]
### Changed
......
......@@ -202,6 +202,7 @@ DJANGO_APPS = (
LOCAL_APPS = (
'popolo',
'{0}.api_v1'.format(PROJECT_PACKAGE),
'{0}.atoka'.format(PROJECT_PACKAGE),
'{0}.akas'.format(PROJECT_PACKAGE),
'{0}.scrapy'.format(PROJECT_PACKAGE),
'{0}.checks'.format(PROJECT_PACKAGE),
......
......@@ -3,4 +3,4 @@
Openpolis Data Manager service package (backend)
"""
__version__ = '1.0.3'
__version__ = '1.1.0'
import itertools
import json
from io import BytesIO
from typing import Type, Union
import pandas as pd
import requests
from opdmetl.extractors import CSVExtractor, RemoteExtractor, Extractor
from project.connections.atoka import AtokaConn, AtokaObjectDoesNotExist
class ListExtractor(Extractor):
"""Wraps a list into an Extractor, in order for it to be used in the ETL
......@@ -190,300 +185,3 @@ class UACSVExtractor(CSVExtractor):
)
return df
class AtokaOwnershipsExtractor(Extractor):
""":class:`Extractor` for extractions of ownerships information out of the Atoka API
Uses methods in the `AtokaConn` class, to extract information from atoka api.
"""
def __init__(self, batch: list):
"""Create a new instance of the extractor, to extract info from a batch of companies.
Args:
batch: a list of tax_ids for companies lookup in Atoka
Returns:
instance of a :class:`AtokaOwnershipsExtractor`
"""
self.batch = batch
super().__init__()
def extract(self, **kwargs) -> dict:
"""Extract meaningful information from Atoka.
Given a list of tax_ids in self.batch, queries Atoka API, in order to retrieve oned shares and roles.
Ownerships are returned as a list.
Each element in the list represents an **owner**, and has major identifiers, classifications and
**owned organizations**.
Owned organizations are embedded in the `shares_owned` list.
Each element of `shares_owned` contains identifiers, classifications and
**people having roles** in the organization.
[
{
'atoka_id': owner_atoka_id,
'tax_id': owner_tax_id,
'other_atoka_ids': [ ... ],
'name': owner_name,
'legal_form': owner_atoka_legal_form_level2,
'rea': owner_rea_id,
'cciaa': owner_cciaa,
'shares_owned': [
{
'name': owned_name,
'founded': owned_founded,
'atoka_id': owned_atoka_id,
'tax_id': owned_tax_id,
'rea': owner_rea_id,
'cciaa': owner_cciaa,
'legal_form': owned_atoka_legal_form_level2,
'ateco': owned_ateco,
'percentage': ratio * 100,
'last_update': share_last_update,
'roles': [
{
'person': {
'family_name': familyName,
'given_name': givenName,
'gender': gender,
'birth_date': birthDate,
'birth_location': birthPlace,
'name': name,
'atoka_id':id,
'tax_id': taxId,
},
'label': role_name,
'start_date': role_since
},
...
]
},
...
]
},
...
]
:return: a list with ownerhip and embedded details, with roles
"""
tax_ids = self.batch
atoka_conn = AtokaConn()
atoka_companies_requests = 0
atoka_people_requests = 0
# fetch all companies among the list having govType values set
# will need batch_size=1 here, because shares may contain many results
# and the limit is 50
try:
res_tot = atoka_conn.get_companies_from_tax_ids(
tax_ids, packages='base,shares', active='true', batch_size=1
)
atoka_companies_requests += len(res_tot)
self.logger.debug(
"- da {0} tax_ids, ricevuti da Atoka dettagli e quote per {1} istituzioni "
"(ci possono essere doppioni)".format(
len(tax_ids), len(res_tot)
)
)
except AtokaObjectDoesNotExist:
res_tot = []
# return multiple results for single tax_ids, if any
res_doubles = {}
for r in res_tot:
if r['base']['taxId'] not in res_doubles:
res_doubles[r['base']['taxId']] = []
res_doubles[r['base']['taxId']].append(r['id'])
res_doubles = {
k: v
for k, v in res_doubles.items() if len(v) > 1
}
# remove owners with no shares, or shares to non-active companies
def check_owner_has_shares_in_active_companies(owner_org):
if 'shares' in owner_org and 'sharesOwned' in owner_org['shares']:
properties_to_active = list(filter(
lambda x: x['active'] is True and x['typeOfRight'] == 'proprietà' and 'ratio' in x,
owner_org['shares']['sharesOwned']
))
if len(properties_to_active):
return True
return False
res_tot = list(filter(check_owner_has_shares_in_active_companies, res_tot))
self.logger.debug(
"- {0} istituzioni hanno partecipazioni in aziende attive".format(
len(res_tot)
)
)
# transform the results into a dict
res_dict = {
r['base']['taxId']: {
'atoka_id': r['id'],
'other_atoka_ids': [
atoka_id for atoka_id in res_doubles[r['base']['taxId']] if atoka_id != r['id']
] if r['base']['taxId'] in res_doubles else [],
'name': r['name'],
'legal_form': [x['name'] for x in r['base']['legalForms'] if x['level'] == 2][0],
'rea': r['base'].get('rea', None),
'cciaa': r['base'].get('cciaa', None),
'shares_owned': [
{
'name': sho['name'],
'last_updated': sho['lastUpdate'],
'atoka_id': sho['id'],
'percentage': sho['ratio'] * 100.
}
for sho in filter(
lambda x: x['active'] is True and x['typeOfRight'] == 'proprietà' and 'ratio' in x,
r['shares']['sharesOwned']
)
]
} for r in res_tot
}
# extract all atoka_ids from shares_owned elements and returns flat list
# then apply list(set(x)) to remove duplicates, if any
owned_atoka_ids = list(set(list(itertools.chain.from_iterable([
[x['atoka_id'] for x in r['shares_owned']]
for r in res_dict.values()
]))))
owned_orgs = atoka_conn.get_companies_from_atoka_ids(
owned_atoka_ids, packages='base', active='true', batch_size=1
)
atoka_companies_requests += len(owned_orgs)
self.logger.debug("- ricevuti dettagli per {0} partecipate".format(len(owned_orgs)))
owned_orgs_dict = {
r['id']: {
'name': r['name'],
'cciaa': r['base'].get('cciaa', None),
'rea': r['base'].get('rea', None),
'tax_id': r['base'].get('taxId', None),
'vat': r['base'].get('vat', None),
'founded': r['base'].get('founded', None),
'legal_form': [x['name'] for x in r['base']['legalForms'] if x['level'] == 2][0],
} for r in owned_orgs
}
# extract all people's atoka_ids from res_owned elements and returns flat list, removing duplicates
people = atoka_conn.get_roles_from_atoka_ids(
owned_atoka_ids, packages='base,companies',
companiesRolesOfficial='true', companiesRoles=atoka_conn.allowed_roles
)
atoka_people_requests += len(people)
def extract_birth_place(base: dict) -> Type[Union[str, None]]:
"""
:param base:
:return:
"""
if 'birthPlace' not in base:
return None
if 'municipality' in base['birthPlace']:
return "{0} ({1})".format(
base['birthPlace']['municipality'],
base['birthPlace']['provinceCode']
)
else:
return "{0} ({1})".format(base['birthPlace']['state'], base['birthPlace']['stateCode'])
people_ids = []
people_dict = {}
for person in people:
if (
'base' in person and
'gender' in person['base'] and
'familyName' in person['base'] and
'givenName' in person['base'] and
person['id'] not in people_ids
):
people_ids.append(person['id'])
people_dict[person['id']] = {
'given_name': person['base']['givenName'],
'family_name': person['base']['familyName'],
'gender': person['base']['gender'],
'birth_date': person['base'].get('birthDate', None),
'birth_location': extract_birth_place(person['base']),
'tax_id': person['base'].get('taxId', None),
'companies': [
{
'id': x['id'],
'roles': [
xr for xr in x['roles']
if 'official' in xr and 'name' in xr
]
}
for x in person['companies']['items'] if x['id'] in owned_atoka_ids
]
}
self.logger.debug(
"- ricevuti dettagli per {0} persone ({1} distinte)".format(
len(people), len(people_ids)
)
)
# upgrade owned_orgs_dict with roles, from people_dict
for atoka_id, person in people_dict.items():
person['atoka_id'] = atoka_id
for company in person.pop('companies', []):
org = owned_orgs_dict.get(company['id'])
if org:
if 'roles' not in org:
org['roles'] = []
org['roles'].extend([
{
'person': person,
'label': org_role['name'],
'start_date': org_role.get('since', None)
}
for org_role in company['roles']
])
else:
self.logger.warning(
"! azienda {0} richiesta ad atoka nel calcolo dei ruoli per {1}, "
"ma non presente nei risultati".format(company['id'], person['atoka_id'])
)
# upgrade res_dict values with details values
for tax_id, owner in res_dict.items():
for owned in owner['shares_owned']:
owned_details = owned_orgs_dict.get(owned['atoka_id'], None)
if owned_details:
for f in ['name', 'cciaa', 'rea', 'tax_id', 'vat', 'founded', 'legal_form', 'roles']:
if f in owned_details:
owned[f] = owned_details[f]
else:
self.logger.warning(
"! organizzazione {0} richiesta ad atoka, "
"ma non presente nei risultati".format(owned['atoka_id'])
)
# returns a list
results = []
for tax_id, result in res_dict.items():
result['tax_id'] = tax_id
results.append(result)
return {
'meta': {
'atoka_requests': {
'people': atoka_people_requests,
'companies': atoka_companies_requests
},
'ids': {
'people': people_ids,
'companies': owned_atoka_ids
}
},
'results': results
}
import json
from io import StringIO
import requests
from django.conf import settings
from popolo.models import Person
from requests_toolbelt import MultipartEncoder
class AtokaConn(object):
"""Helper class to perform queries on ATOKA api service.
Configuration values are secret and must be kept safe in environment varables.
"""
service_url = settings.ATOKA_API_ENDPOINT
version = settings.ATOKA_API_VERSION
key = settings.ATOKA_API_KEY
allowed_roles = \
"titolare firmatario,amministratore unico,consigliere,socio amministratore,socio accomandante," \
"socio,socio accomandatario,presidente consiglio amministrazione,socio unico,amministratore,titolare," \
"sindaco effettivo,vice presidente consiglio amministrazione,amministratore delegato,liquidatore," \
"sindaco supplente,socio di societa' in nome collettivo,consigliere delegato,presidente," \
"curatore fallimentare,presidente del collegio sindacale,vice presidente,legale rappresentante," \
"revisore dei conti,legale rappresentante di societa',institore,direttore generale"
def get_person_from_tax_id(self, tax_id: str) -> dict:
"""get a single person from ATOKA API, from its tax_id
raise Atoka exceptions if errors or no objects found
:param tax_id: string - the tax_id as a string
:return: dict - ATOKA result
"""
response = requests.get(
'{0}/{1}/people'.format(
self.service_url, self.version
),
params={
'token': self.key,
'taxIds': tax_id,
'packages': 'base,companies,shares'
}
)
if not response.ok:
raise AtokaResponseError(response.reason)
result = response.json()
if result['meta']['count'] == 0:
raise AtokaObjectDoesNotExist(
"Could not find person with tax_id {0} in Atoka.".format(tax_id)
)
if result['meta']['count'] > 1:
raise AtokaMultipleObjectsReturned(
"Found more than one person with tax_id {0} in Atoka.".format(tax_id)
)
return result['items'][0]
def search_person(self, person: Person) -> dict:
"""get a single person from ATOKA API, from its tax_id
raise Atoka exceptions if errors or no objects found
:param person: Person - instance of OPDM person to look for into ATOKA
:return: dict - atoka result
"""
params = {
'token': self.key,
'givenName': person.given_name,
'familyName': person.family_name,
'birthDateFrom': person.birth_date,
'birthDateTo': person.birth_date,
'birtPlaceMunicipalities': person.birth_location_area.name,
'packages': 'base,companies,shares'
}
response = requests.get(
'{0}/{1}/people'.format(
self.service_url, self.version
),
params=params
)
if not response.ok:
raise AtokaResponseError(response.reason)
params.pop('token')
params.pop('packages')
result = response.json()
if result['meta']['count'] == 0:
raise AtokaObjectDoesNotExist(
"Could not find person with parameters {0} in Atoka.".format(params)
)
if result['meta']['count'] > 1:
raise AtokaMultipleObjectsReturned(
"Found more than one person with parameters {0} in Atoka.".format(params)
)
return result['items'][0]
def get_items_from_ids(
self, ids: list, item_type: str, ids_field_name: str = 'ids', batch_size: int = 50, **kwargs
) -> list:
"""Transform a request for a list of ids larger than batch_size,
to a batch request of enough rows with a limit of batch_size, so that all results
can be returned.
Results are composed and returned as a list of dicts.
:param ids: list
:param item_type: str
:param ids_field_name: ids, tax_ids
:param batch_size: size of the number of ids searched by row of the batch IO
:param kwargs: - more atoka parameters for filtering results
(ex: packages=base,shares, active='true', ccia='*')
:return: results as a list of dicts
"""
if ids_field_name not in ['ids', 'taxIds', 'companies']:
raise AtokaException("ids_field_name parameter must take one of these values: <ids>, <taxIds>, <companies>")
if batch_size < 1 or batch_size > 50:
raise AtokaException("batch_size must be between 1 and 50")
if item_type not in ['companies', 'people']:
raise AtokaException("item_type must take one of these values: <companies>, <people>")
if len(ids) == 0:
return []
api_endpoint = "{0}/{1}/{2}/".format(
self.service_url, self.version, item_type
)
# internal function to split ids list into chunks
def chunks(lst, size):
"""Yield successive size-sized chunks from lst."""
for i in range(0, len(lst), size):
yield lst[i:i + size]
# build fileIO to upload form batch execution
file_io = StringIO()
for n, r in enumerate(chunks(ids, batch_size)):
print(json.dumps({
"reqId": "r{0:05d}".format(n),
ids_field_name: ','.join(r),
}), file=file_io)
# batch API request
fields = {
'batch': ('batch.json', file_io),
'limit': '50'
}
fields.update(kwargs)
m = MultipartEncoder(
fields=fields
)
response = requests.post(
api_endpoint,
params={'token': self.key},
data=m,
headers={'Content-Type': m.content_type}
)
# destroy fileIO
file_io.close()
if response is None:
return []
# return response
json_response = response.json()
if not response.ok:
raise AtokaResponseError(response.reason)
total_response = []
if 'responses' in json_response:
for r in json_response['responses'].values():
total_response.extend(r['items'])
else:
total_response.extend(json_response['items'])
if len(total_response) == 0:
raise AtokaObjectDoesNotExist(getattr(response, "content", None))
return total_response
def get_companies_from_tax_ids(self, tax_ids: list, **kwargs) -> list:
"""get all companies from ATOKA API, from given tax_ids list
raise Atoka exceptions if errors or no objects found
:param tax_ids: - the list of tax_ids to extract info from
:param kwargs: - more atoka parameters for filtering results (ex: active='true', ccia='*')
:return: dict - ATOKA result
"""
return self.get_items_from_ids(tax_ids, 'companies', ids_field_name='taxIds', **kwargs)
def get_companies_from_atoka_ids(self, atoka_ids: list, **kwargs) -> list:
"""get all companies from ATOKA API, from given atoka_ids list
raise Atoka exceptions if errors or no objects found
:param atoka_ids: - the list of ids to extract info from
:param kwargs: - more atoka parameters for filtering results (ex: active='true', ccia='*')
:return: dict - ATOKA result
"""
return self.get_items_from_ids(atoka_ids, 'companies', ids_field_name='ids', **kwargs)
def get_people_from_tax_ids(self, tax_ids: list, **kwargs) -> list:
"""get all people from ATOKA API, from given tax_ids list
raise Atoka exceptions if errors or no objects found
:param tax_ids: - the list of tax_ids to extract info from
:param kwargs: - more atoka parameters for filtering results (ex: active='true', ccia='*')
:return: dict - ATOKA result
"""
return self.get_items_from_ids(tax_ids, 'people', ids_field_name='taxIds', **kwargs)
def get_people_from_atoka_ids(self, atoka_ids: list, **kwargs) -> list:
"""get all people from ATOKA API, from given atoka_ids list
raise Atoka exceptions if errors or no objects found
:param atoka_ids: - the list of ids to extract info from
:param kwargs: - more atoka parameters for filtering results (ex: active='true', ccia='*')
:return: dict - ATOKA result
"""
return self.get_items_from_ids(atoka_ids, 'people', ids_field_name='ids', **kwargs)
def get_roles_from_atoka_ids(self, atoka_ids: list, **kwargs) -> list:
"""get all people in given companies, used to extract roles
:param atoka_ids:
:param kwargs:
:return:
"""
# need a batch_size of 1 because the number of people in a single company can be great,
# and 50 is the maximum limit for a single batch row request
return self.get_items_from_ids(atoka_ids, 'people', ids_field_name='companies', batch_size=1, **kwargs)
class AtokaException(Exception):
pass
class AtokaObjectDoesNotExist(AtokaException):
pass
class AtokaMultipleObjectsReturned(AtokaException):
pass
class AtokaResponseError(AtokaException):
pass
import itertools
from typing import Type, Union
from opdmetl.extractors import Extractor
from project.connections.atoka import AtokaConn, AtokaObjectDoesNotExist
class AtokaOwnershipsExtractor(Extractor):
""":class:`Extractor` for extractions of ownerships information out of the Atoka API
Uses methods in the `AtokaConn` class, to extract information from atoka api.
"""
def __init__(self, batch: list):
"""Create a new instance of the extractor, to extract info from a batch of companies.
Args:
batch: a list of tax_ids for companies lookup in Atoka
Returns:
instance of a :class:`AtokaOwnershipsExtractor`
"""
self.batch = batch
super().__init__()
def extract(self, **kwargs) -> dict:
"""Extract meaningful information from Atoka.
Given a list of tax_ids in self.batch, queries Atoka API, in order to retrieve oned shares and roles.
Ownerships are returned as a list.
Each element in the list represents an **owner**, and has major identifiers, classifications and
**owned organizations**.
Owned organizations are embedded in the `shares_owned` list.
Each element of `shares_owned` contains identifiers, classifications and
**people having roles** in the organization.
[
{
'atoka_id': owner_atoka_id,
'tax_id': owner_tax_id,
'other_atoka_ids': [ ... ],
'name': owner_name,
'legal_form': owner_atoka_legal_form_level2,
'rea': owner_rea_id,
'cciaa': owner_cciaa,
'shares_owned': [
{
'name': owned_name,
'founded': owned_founded,
'atoka_id': owned_atoka_id,
'tax_id': owned_tax_id,
'rea': owner_rea_id,
'cciaa': owner_cciaa,
'legal_form': owned_atoka_legal_form_level2,
'ateco': owned_ateco,
'percentage': ratio * 100,
'last_update': share_last_update,
'roles': [
{
'person': {
'family_name': familyName,
'given_name': givenName,
'gender': gender,
'birth_date': birthDate,
'birth_location': birthPlace,
'name': name,
'atoka_id':id,
'tax_id': taxId,
},
'label': role_name,
'start_date': role_since
},
...
]
},
...
]
},
...
]
:return: a list with ownerhip and embedded details, with roles
"""
tax_ids = self.batch
atoka_conn = AtokaConn()
atoka_companies_requests = 0
atoka_people_requests = 0
# fetch all companies among the list having govType values set
# will need batch_size=1 here, because shares may contain many results
# and the limit is 50
try:
res_tot = atoka_conn.get_companies_from_tax_ids(
tax_ids, packages='base,shares', active='true', batch_size=1
)
atoka_companies_requests += len(res_tot)
self.logger.debug(
"- da {0} tax_ids, ricevuti da Atoka dettagli e quote per {1} istituzioni "
"(ci possono essere doppioni)".format(
len(tax_ids), len(res_tot)
)
)
except AtokaObjectDoesNotExist:
res_tot = []
# return multiple results for single tax_ids, if any
res_doubles = {}
for r in res_tot:
if r['base']['taxId'] not in res_doubles:
res_doubles[r['base']['taxId']] = []
res_doubles[r['base']['taxId']].append(r['id'])
res_doubles = {
k: v
for k, v in res_doubles.items() if len(v) > 1
}
# remove owners with no shares, or shares to non-active companies
def check_owner_has_shares_in_active_companies(owner_org):
if 'shares' in owner_org and 'sharesOwned' in owner_org['shares']:
properties_to_active = list(filter(
lambda x: x['active'] is True and x['typeOfRight'] == 'proprietà' and 'ratio' in x,
owner_org['shares']['sharesOwned']
))
if len(properties_to_active):
return True
return False
res_tot = list(filter(check_owner_has_shares_in_active_companies, res_tot))
self.logger.debug(
"- {0} istituzioni hanno partecipazioni in aziende attive".format(
len(res_tot)
)
)
# transform the results into a dict
res_dict = {
r['base']['taxId']: {
'atoka_id': r['id'],
'other_atoka_ids': [
atoka_id for atoka_id in res_doubles[r['base']['taxId']] if atoka_id != r['id']
] if r['base']['taxId'] in res_doubles else [],
'name': r['name'],
'legal_form': [x['name'] for x in r['base']['legalForms'] if x['level'] == 2][0],
'rea': r['base'].get('rea', None),
'cciaa': r['base'].get('cciaa', None),
'shares_owned': [
{
'name': sho['name'],
'last_updated': sho['lastUpdate'],
'atoka_id': sho['id'],
'percentage': sho['ratio'] * 100.
}
for sho in filter(
lambda x: x['active'] is True and x['typeOfRight'] == 'proprietà' and 'ratio' in x,
r['shares']['sharesOwned']
)
]
} for r in res_tot
}
# extract all atoka_ids from shares_owned elements and returns flat list
# then apply list(set(x)) to remove duplicates, if any
owned_atoka_ids = list(set(list(itertools.chain.from_iterable([
[x['atoka_id'] for x in r['shares_owned']]
for r in res_dict.values()
]))))
owned_orgs = atoka_conn.get_companies_from_atoka_ids(
owned_atoka_ids, packages='base', active='true', batch_size=1
)
atoka_companies_requests += len(owned_orgs)
self.logger.debug("- ricevuti dettagli per {0} partecipate".format(len(owned_orgs)))
owned_orgs_dict = {
r['id']: {
'name': r['name'],
'cciaa': r['base'].get('cciaa', None),
'rea': r['base'].get('rea', None),
'tax_id': r['base'].get('taxId', None),
'vat': r['base'].get('vat', None),
'founded': r['base'].get('founded', None),
'legal_form': [x['name'] for x in r['base']['legalForms'] if x['level'] == 2][0],
} for r in owned_orgs
}
# extract all people's atoka_ids from res_owned elements and returns flat list, removing duplicates
people = atoka_conn.get_roles_from_atoka_ids(
owned_atoka_ids, packages='base,companies',
companiesRolesOfficial='true', companiesRoles=atoka_conn.allowed_roles
)
atoka_people_requests += len(people)
def extract_birth_place(base: dict) -> Type[Union[str, None]]:
"""
:param base:
:return:
"""
if 'birthPlace' not in base:
return None
if 'municipality' in base['birthPlace']:
return "{0} ({1})".format(
base['birthPlace']['municipality'],
base['birthPlace']['provinceCode']
)
else:
return "{0} ({1})".format(base['birthPlace']['state'], base['birthPlace']['stateCode'])
people_ids = []
people_dict = {}
for person in people:
if (
'base' in person and
'gender' in person['base'] and
'familyName' in person['base'] and
'givenName' in person['base'] and
person['id'] not in people_ids
):
people_ids.append(person['id'])
people_dict[person['id']] = {
'given_name': person['base']['givenName'],
'family_name': person['base']['familyName'],
'gender': person['base']['gender'],
'birth_date': person['base'].get('birthDate', None),
'birth_location': extract_birth_place(person['base']),
'tax_id': person['base'].get('taxId', None),
'companies': [
{
'id': x['id'],
'roles': [
xr for xr in x['roles']
if 'official' in xr and xr['official'] is True and 'name' in xr
]
}
for x in person['companies']['items'] if x['id'] in owned_atoka_ids
]
}
self.logger.debug(
"- ricevuti dettagli per {0} persone ({1} distinte)".format(
len(people), len(people_ids)
)
)
# upgrade owned_orgs_dict with roles, from people_dict
for atoka_id, person in people_dict.items():
person['atoka_id'] = atoka_id
for company in person.pop('companies', []):
org = owned_orgs_dict.get(company['id'])
if org:
if 'roles' not in org:
org['roles'] = []
org['roles'].extend([
{
'person': person,
'label': org_role['name'],
'start_date': org_role.get('since', None)
}
for org_role in company['roles']
])
else:
self.logger.warning(
"! azienda {0} richiesta ad atoka nel calcolo dei ruoli per {1}, "
"ma non presente nei risultati".format(company['id'], person['atoka_id'])
)
# upgrade res_dict values with details values
for tax_id, owner in res_dict.items():
for owned in owner['shares_owned']:
owned_details = owned_orgs_dict.get(owned['atoka_id'], None)
if owned_details:
for f in ['name', 'cciaa', 'rea', 'tax_id', 'vat', 'founded', 'legal_form', 'roles']:
if f in owned_details:
owned[f] = owned_details[f]
else:
self.logger.warning(
"! organizzazione {0} richiesta ad atoka, "
"ma non presente nei risultati".format(owned['atoka_id'])
)
# returns a list
results = []
for tax_id, result in res_dict.items():
result['tax_id'] = tax_id
results.append(result)
return {
'meta': {
'atoka_requests': {
'people': atoka_people_requests,
'companies': atoka_companies_requests
},
'ids': {
'people': people_ids,
'companies': owned_atoka_ids
}
},
'results': results
}
# -*- coding: utf-8 -*-
import json
from popolo.models import Organization
from taskmanager.utils import LoggingBaseCommand
from project.api_v1.core import batch_generator
from project.api_v1.etl.extractors import AtokaEconomicsExtractor
class Command(LoggingBaseCommand):
help = "Extract economics info from ATOKA's API and store them as an array into a local JSON file"
def add_arguments(self, parser):
parser.add_argument(
"--batch-size",
dest="batchsize", type=int,
default=25,
help="Size of the batch of organizations processed at once",
)
parser.add_argument(
"--offset",
dest="offset", type=int,
default=0,
help="Start processing",
)
parser.add_argument(
"--json-file",
dest="jsonfile",
default="./resources/out/atoka.json",
help="Complete path to json file"
)
def handle(self, *args, **options):
self.setup_logger(__name__, formatter_key='simple', **options)
batchsize = options['batchsize']
offset = options['offset']
jsonfile = options['jsonfile']
self.logger.info("Start procedure")
# start filtering current organizations with a tax_id,
# excluding those classified as private
organizations_qs = Organization.objects.filter(
identifiers__scheme='ATOKA_ID'
).current()
atoka_records = []
atoka_companies_requests = 0
atoka_people_requests = 0
people_ids = []
owned_ids = []
counter = 0
# generate batches of batchsize, to query atoka's endpoint
batches = batch_generator(
batchsize, organizations_qs.values_list('identifier', flat=True).distinct().iterator()
)
group_counter = 0
for tax_ids_batch in batches:
# extract economics information for organizations from ATOKA
# implement offset
if counter >= offset:
atoka_extractor = AtokaEconomicsExtractor(tax_ids_batch)
atoka_extractor.logger = self.logger
atoka_res = atoka_extractor.extract()
atoka_records.extend(atoka_res['results'])
atoka_companies_requests += atoka_res['meta']['atoka_requests']['companies']
atoka_people_requests += atoka_res['meta']['atoka_requests']['people']
people_ids.extend(atoka_res['meta']['ids']['people'])
people_ids = list(set(people_ids))
owned_ids.extend(atoka_res['meta']['ids']['companies'])
owned_ids = list(set(owned_ids))
group_counter += len(tax_ids_batch)
self.logger.info("{0} tax_ids, {1} partecipate, {2} persone --------".format(
group_counter, len(owned_ids), len(people_ids)
))
else:
self.logger.info("skipping {0} tax_ids".format(
len(tax_ids_batch)
))
self.logger.debug("")
counter += len(tax_ids_batch)
self.logger.info(
"crediti spesi con atoka: {0} companies, {1} people".format(
atoka_companies_requests, atoka_people_requests
)
)
# produce the json file
self.logger.info("Dati scritti in {0}".format(jsonfile))
with open(jsonfile, "w") as f:
json.dump(atoka_records, f)
......@@ -5,7 +5,7 @@ from popolo.models import Organization
from taskmanager.utils import LoggingBaseCommand
from project.api_v1.core import batch_generator
from project.api_v1.etl.extractors import AtokaOwnershipsExtractor
from project.atoka.etl.extractors import AtokaOwnershipsExtractor
class Command(LoggingBaseCommand):
......
......@@ -6,7 +6,7 @@ from opdmetl.loaders import JsonLoader
from taskmanager.utils import LoggingBaseCommand
from project.api_v1.etl.extractors import JsonArrayExtractor, ListExtractor
from project.api_v1.etl.transformations.atoka import \
from project.atoka.etl.transformations import \
AtokaOwnershipTransformation, \
AtokaMembershipTransformation, \
AtokaOrganizationTransformation
......
# -*- coding: utf-8 -*-
# Generated by Django 1.11.15 on 2019-02-04 15:22
from __future__ import unicode_literals
import django.core.validators
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
('popolo', '0016_auto_20190117_0900'),
]
operations = [
migrations.CreateModel(
name='OrganizationEconomics',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('year', models.PositiveIntegerField(help_text='Year of validity of economics indicator', validators=[django.core.validators.MinValueValidator(2000), django.core.validators.MaxValueValidator(2019)])),
('n_employees', models.PositiveIntegerField(help_text='Nymber of employees for current year')),
('revenue', models.PositiveIntegerField(help_text='Revenue for current year')),
('revenue_trend', models.FloatField(help_text='Revenue trend with respect to previous year', validators=[django.core.validators.MinValueValidator(0.0), django.core.validators.MaxValueValidator(2019)])),
('organization', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='popolo.Organization')),
],
),
]
from datetime import datetime
from django.core.validators import MinValueValidator, MaxValueValidator
from django.db import models
from django.utils.translation import ugettext_lazy as _
from popolo.models import Organization
current_year = datetime.now().year
class OrganizationEconomics(models.Model):
organization = models.ForeignKey(
Organization,
on_delete=models.CASCADE,
)
year = models.PositiveIntegerField(
validators=[MinValueValidator(2000), MaxValueValidator(current_year)],
help_text=_("Year of validity of economics indicator")
)
n_employees = models.PositiveIntegerField(
help_text=_("Nymber of employees for current year")
)
revenue = models.PositiveIntegerField(
help_text=_("Revenue for current year")
)
revenue_trend = models.FloatField(
validators=[MinValueValidator(0.), MaxValueValidator(current_year)],
help_text=_("Revenue trend with respect to previous year")
)
This diff is collapsed.
[bumpversion]
current_version = 1.0.3
current_version = 1.1.0
commit = True
tag = True
tag_name = v{new_version}
......