In [1]:
import os
import pandas as pd
import geopandas as gpd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from shapely import wkt
# Configurazione stile grafici
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_palette('viridis')
In [2]:
ilab_path = os.path.join(os.path.expanduser('~'),'ILAB_DATA')
work_path = os.path.join(ilab_path,'PATRIMONIO','DATA')
out_path = os.path.join(ilab_path,'PATRIMONIO','OUT')
In [3]:
resolution = 8
Caricamento dei dati¶
In [4]:
file_path = os.path.join(out_path, f'grid_{resolution:02}_adv.csv')
# Caricamento dei dati
df = pd.read_csv(file_path)
print(f"Dataset caricato con {df.shape[0]} righe e {df.shape[1]} colonne")
# Mostro le prime righe
df.head()
Dataset caricato con 2286 righe e 336 colonne
Out[4]:
| geometry | h3_08 | cell_a_m2 | CLC||Agricultural | CLC||Artificial, non-agricultural vegetated | CLC||Forest and semi natural | CLC||Industrial, commercial and transport | CLC||Mine, dump and construction | CLC||Urban | CLC||Water bodies | ... | sanita_ospedale_transit||transit__120__ospedale | sanita_ospedale_transit||transit__120__A.O. INTEGRATA CON IL SSN | sanita_ospedale_transit||transit__120__A.O. INTEGRATA CON L'UNIVERSITA' | sanita_ospedale_transit||transit__120__AZIENDA OSPEDALIERA | sanita_ospedale_transit||transit__120__IRCCS FONDAZIONE | sanita_ospedale_transit||transit__120__IRCCS PRIVATO | sanita_ospedale_transit||transit__120__IRCCS PUBBLICO | sanita_ospedale_transit||transit__120__OSPED. A GESTIONE DIRETTA PRESIDIO A.S.L. | sanita_ospedale_transit||transit__120__OSPED. CLASSIF. O ASSIMIL. ART 1 L.132/1968 | sanita_ospedale_transit||transit__120__POLICLINICO UNIVERSITARIO PRIVATO | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | POLYGON ((778248.9610496003 4640757.536709883,... | 881e805327fffff | 789678.181136 | 554587.539667 | 0.0 | 0.0 | 0.0 | 8439.846982 | 226650.794486 | 0.000000 | ... | 21.0 | 1.0 | 0.0 | 2.0 | 2.0 | 2.0 | 1.0 | 5.0 | 7.0 | 1.0 |
| 1 | POLYGON ((780507.5911791052 4630136.4950892795... | 881e805ae7fffff | 790737.567794 | 512453.371816 | 0.0 | 0.0 | 0.0 | 0.000000 | 278284.195977 | 0.000000 | ... | 17.0 | 1.0 | 0.0 | 2.0 | 2.0 | 1.0 | 1.0 | 4.0 | 5.0 | 1.0 |
| 2 | POLYGON ((779036.3155420923 4626205.889196136,... | 881e805803fffff | 790949.982986 | 0.000000 | 0.0 | 0.0 | 0.0 | 0.000000 | 790949.982986 | 0.000000 | ... | 19.0 | 1.0 | 0.0 | 2.0 | 2.0 | 2.0 | 1.0 | 4.0 | 6.0 | 1.0 |
| 3 | POLYGON ((772630.2540061506 4624140.133895666,... | 881e80594bfffff | 790625.764584 | 0.000000 | 0.0 | 0.0 | 0.0 | 0.000000 | 0.000000 | 790625.764584 | ... | 7.0 | 0.0 | 0.0 | 2.0 | 1.0 | 0.0 | 1.0 | 1.0 | 1.0 | 1.0 |
| 4 | POLYGON ((778925.7340115361 4653302.784000916,... | 881e80c9dbfffff | 788677.527561 | 788677.527561 | 0.0 | 0.0 | 0.0 | 0.000000 | 0.000000 | 0.000000 | ... | 17.0 | 1.0 | 1.0 | 2.0 | 1.0 | 2.0 | 1.0 | 3.0 | 5.0 | 1.0 |
5 rows × 336 columns
In [5]:
num_cols = df.select_dtypes(include=['int64', 'float64']).columns
for col in num_cols:
df[col] = df[col].fillna(0)
In [6]:
# Conversione in GeoDataFrame se c'è una colonna geometry
if 'geometry' in df.columns:
try:
df['geometry'] = df['geometry'].apply(wkt.loads)
gdf = gpd.GeoDataFrame(df, geometry='geometry')
print("Dati convertiti in formato geospaziale")
except Exception as e:
print(f"Errore nella conversione in GeoDataFrame: {e}")
gdf = df
else:
print("Colonna 'geometry' non trovata, utilizzo DataFrame standard")
gdf = df
Dati convertiti in formato geospaziale
Analisi delle colonne disponibili per tipologia¶
In [7]:
# Esplorazione dei dati
# Esaminiamo i prefissi delle colonne per comprendere meglio il dataset
prefixes = [col.split('||')[0] if '||' in col else col.split('_')[0] for col in df.columns]
prefix_counts = pd.Series(prefixes).value_counts()
print("Prefissi delle colonne nel dataset:")
print(prefix_counts)
# Visualizziamo le colonne demografiche (ISTAT) e di reddito
demo_cols = [col for col in df.columns if 'ISTAT||' in col and '__21' in col]
income_cols = [col for col in df.columns if 'reddito_pro_capite' in col]
access_cols = [col for col in df.columns if 'ovm_infr' in col and 'traffic__15' in col]
land_cols = [col for col in df.columns if 'CLC||' in col]
print("\nColonne demografiche 2021:", demo_cols)
print("\nColonne reddito:", income_cols)
print("\nColonne accessibilità (15 min):", access_cols[:5], "..." if len(access_cols) > 5 else "")
print("\nColonne uso del suolo:", land_cols)
Prefissi delle colonne nel dataset: ovm_places_istruzione 64 sanita_ospedale 40 ovm_places_sport 36 ISTAT 33 ovm_places_cultura 24 ovm_places_istruzione_transit 22 ovm_places_sanita 20 sanita_ospedale_transit 20 ovm_infr_infrastrutture 16 ovm_places_ristorazione 16 ovm_places_strutture ricettive 12 CLC 10 OMI_vanivuoti 7 beni_immobili_erp 4 OMI 4 airbnb_airbnb 3 reddito_pro_capite 2 h3 1 cell 1 geometry 1 Name: count, dtype: int64 Colonne demografiche 2021: ['ISTAT||POP_F__21', 'ISTAT||POP_M__21', 'ISTAT||POP_TOT__21', 'ISTAT||P_0_14__21', 'ISTAT||P_15_19__21', 'ISTAT||P_20_39__21', 'ISTAT||P_40_64__21', 'ISTAT||P_65_99__21', 'ISTAT||POP_TOT__21__11'] Colonne reddito: ['reddito_pro_capite||redd_cell', 'reddito_pro_capite||redd_pc'] Colonne accessibilità (15 min): ['ovm_infr_infrastrutture||traffic__15__total_infrastrutture', 'ovm_infr_infrastrutture||traffic__15__bus', 'ovm_infr_infrastrutture||traffic__15__stazione', 'ovm_infr_infrastrutture||traffic__15__aeroporto'] Colonne uso del suolo: ['CLC||Agricultural', 'CLC||Artificial, non-agricultural vegetated', 'CLC||Forest and semi natural', 'CLC||Industrial, commercial and transport', 'CLC||Mine, dump and construction', 'CLC||Urban', 'CLC||Water bodies', 'CLC||LABEL', 'CLC||int_a_m2', 'CLC||perc']
In [8]:
# Pulizia e preparazione dei dati
# Convertiamo le colonne demografiche, reddito e accessibilità a numeriche
for col in demo_cols + income_cols + access_cols + land_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# Calcoliamo la popolazione totale 2021 per cella
if 'ISTAT||POP_TOT__21' in df.columns:
print(f"Popolazione totale 2021 disponibile direttamente")
else:
# Se non disponibile direttamente, sommiamo le componenti demografiche
pop_cols = [col for col in demo_cols if 'P_' in col and '__21' in col]
if pop_cols:
df['POP_TOT_21'] = df[pop_cols].sum(axis=1)
print(f"Popolazione totale 2021 calcolata dalla somma di {len(pop_cols)} colonne demografiche")
# Creiamo densità abitativa (popolazione per km²)
if 'cell_a_m2' in df.columns:
df['cell_a_m2'] = pd.to_numeric(df['cell_a_m2'], errors='coerce')
# Abitanti per km²
df['densita_abitativa'] = np.where(df['cell_a_m2'] > 0,
df['ISTAT||POP_TOT__21'] / (df['cell_a_m2'] / 1000000),
0)
print("Calcolata densità abitativa (abitanti per km²)")
Popolazione totale 2021 disponibile direttamente Calcolata densità abitativa (abitanti per km²)
/tmp/ipykernel_1171799/3340331269.py:22: PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()` df['densita_abitativa'] = np.where(df['cell_a_m2'] > 0,
Indice di urbanizzazione¶
In [9]:
if 'CLC||Urban' in df.columns and 'cell_a_m2' in df.columns:
# Calcola l'indice di urbanizzazione (% di area urbanizzata)
df['indice_urbanizzazione'] = np.where(df['cell_a_m2'] > 0,
df['CLC||Urban'] / df['cell_a_m2'] * 100,
0)
print("Calcolato indice di urbanizzazione")
Calcolato indice di urbanizzazione
/tmp/ipykernel_1171799/392824534.py:3: PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()` df['indice_urbanizzazione'] = np.where(df['cell_a_m2'] > 0,
Indice di invecchiamento¶
In [10]:
if 'ISTAT||P_65_99__21' in df.columns and 'ISTAT||P_0_14__21' in df.columns:
# Calcola indice invecchiamento (anziani / giovani * 100)
df['indice_invecchiamento'] = np.where(df['ISTAT||P_0_14__21'] > 0,
df['ISTAT||P_65_99__21'] / df['ISTAT||P_0_14__21'] * 100,
0)
df['indice_invecchiamento'] = df['indice_invecchiamento'].clip(0, 500)
print("Calcolato indice di invecchiamento")
Calcolato indice di invecchiamento
/tmp/ipykernel_1171799/3750055049.py:3: PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()` df['indice_invecchiamento'] = np.where(df['ISTAT||P_0_14__21'] > 0,
Creazione dell'indice di accessibilità¶
Indice basato sul numero di servizi/infrastrutture raggiungibili a piedi in 15 minuti
In [11]:
if any('walking__15' in col for col in df.columns):
# Selezioniamo le colonne relative ai servizi essenziali più rilevanti per l'housing
essential_services = [
'ovm_infr_sanita||walking__15__n_sanita',
'ovm_infr_istruzione||walking__15__n_istruzione',
'ovm_infr_commercio||walking__15__n_commercio',
'ovm_infr_trasporti||walking__15__n_trasporti'
]
# Verifichiamo quali servizi sono disponibili nel dataset
available_services = [col for col in essential_services if col in df.columns]
if available_services:
# Standardizziamo i valori
scaler = StandardScaler()
services_data = df[available_services].copy()
# Gestiamo i NaN sostituendoli con 0 (no servizi)
services_data = services_data.fillna(0)
# Standardizziamo
services_scaled = scaler.fit_transform(services_data)
# Creiamo l'indice come media dei valori standardizzati
df['indice_accessibilita_servizi'] = np.mean(services_scaled, axis=1)
# Normalizziamo l'indice tra 0 e 100 per interpretabilità
min_val = df['indice_accessibilita_servizi'].min()
max_val = df['indice_accessibilita_servizi'].max()
df['indice_accessibilita_servizi'] = (df['indice_accessibilita_servizi'] - min_val) / (max_val - min_val) * 100
print(f"Calcolato indice di accessibilità pedonale ai servizi basato su {len(available_services)} servizi")
Creazione indice di bisogno abitativo (housing need)¶
Combiniamo densità abitativa, accessibilità e reddito
In [12]:
# Verifichiamo quali indicatori sono disponibili
available_indicators = []
if 'densita_abitativa' in df.columns:
available_indicators.append('densita_abitativa')
if 'indice_accessibilita_servizi' in df.columns:
available_indicators.append('indice_accessibilita_servizi')
if 'reddito_pro_capite||redd_pc' in df.columns:
df['reddito_pc'] = df['reddito_pro_capite||redd_pc']
available_indicators.append('reddito_pc')
if 'indice_invecchiamento' in df.columns:
available_indicators.append('indice_invecchiamento')
if 'indice_urbanizzazione' in df.columns:
available_indicators.append('indice_urbanizzazione')
# Creiamo l'indice composito solo se abbiamo almeno 3 indicatori
if len(available_indicators) >= 3:
# Prendiamo solo le righe senza NaN per gli indicatori selezionati
df_index = df.dropna(subset=available_indicators).copy()
# Standardizziamo gli indicatori
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_index[available_indicators])
X_scaled_df = pd.DataFrame(X_scaled, columns=available_indicators, index=df_index.index)
# Calcoliamo l'indice composito
# Alta densità, alta accessibilità, basso reddito, alto invecchiamento = alto bisogno
if 'densita_abitativa' in available_indicators:
df_index['densita_norm'] = X_scaled_df['densita_abitativa']
if 'indice_accessibilita_servizi' in available_indicators:
df_index['accessibilita_norm'] = X_scaled_df['indice_accessibilita_servizi']
if 'reddito_pc' in available_indicators:
# Invertiamo il reddito (reddito basso = maggior bisogno)
df_index['reddito_norm'] = -X_scaled_df['reddito_pc']
if 'indice_invecchiamento' in available_indicators:
df_index['invecchiamento_norm'] = X_scaled_df['indice_invecchiamento']
if 'indice_urbanizzazione' in available_indicators:
df_index['urbanizzazione_norm'] = X_scaled_df['indice_urbanizzazione']
# Calcoliamo l'indice come media pesata dei fattori normalizzati
weights = {}
norm_columns = [col for col in df_index.columns if '_norm' in col]
for col in norm_columns:
if 'densita_norm' in col:
weights[col] = 0.25 # Alto peso alla densità
elif 'accessibilita_norm' in col:
weights[col] = 0.20 # Peso medio all'accessibilità
elif 'reddito_norm' in col:
weights[col] = 0.30 # Alto peso al reddito (invertito)
elif 'invecchiamento_norm' in col:
weights[col] = 0.15 # Peso basso all'invecchiamento
elif 'urbanizzazione_norm' in col:
weights[col] = 0.10 # Peso basso all'urbanizzazione
df_index['indice_bisogno_abitativo'] = sum(df_index[col] * weights[col] for col in norm_columns)
# Normalizziamo tra 0 e 100
min_val = df_index['indice_bisogno_abitativo'].min()
max_val = df_index['indice_bisogno_abitativo'].max()
df_index['indice_bisogno_abitativo'] = (df_index['indice_bisogno_abitativo'] - min_val) / (max_val - min_val) * 100
# Aggiungiamo l'indice al dataframe originale
df.loc[df_index.index, 'indice_bisogno_abitativo'] = df_index['indice_bisogno_abitativo']
print(f"Calcolato indice di bisogno abitativo basato su {len(available_indicators)} indicatori")
/tmp/ipykernel_1171799/2238622374.py:11: PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()` df['reddito_pc'] = df['reddito_pro_capite||redd_pc']
Calcolato indice di bisogno abitativo basato su 4 indicatori
/tmp/ipykernel_1171799/2238622374.py:72: PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()` df.loc[df_index.index, 'indice_bisogno_abitativo'] = df_index['indice_bisogno_abitativo']
In [13]:
# Visualizzazione della distribuzione dell'indice di bisogno abitativo
if 'indice_bisogno_abitativo' in df.columns:
plt.figure(figsize=(12, 6))
sns.histplot(df['indice_bisogno_abitativo'].dropna(), bins=30, kde=True)
plt.title('Distribuzione dell\'indice di bisogno abitativo')
plt.xlabel('Indice di bisogno abitativo (0-100)')
plt.ylabel('Numero di celle')
plt.show()
# Categorizzazione dell'indice in 5 classi
df['classe_bisogno_abitativo'] = pd.qcut(
df['indice_bisogno_abitativo'].dropna(),
5,
labels=['Molto basso', 'Basso', 'Medio', 'Alto', 'Molto alto']
)
plt.figure(figsize=(12, 6))
sns.countplot(y='classe_bisogno_abitativo', data=df)
plt.title('Distribuzione delle classi di bisogno abitativo')
plt.xlabel('Numero di celle')
plt.ylabel('Classe di bisogno abitativo')
plt.show()
/tmp/ipykernel_1171799/898319565.py:11: PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()` df['classe_bisogno_abitativo'] = pd.qcut(
In [14]:
# Visualizzazione su mappa dell'indice di bisogno abitativo con classificazione di Jenks
import geopandas as gpd
from shapely import wkt
from jenkspy import jenks_breaks
# Verifico se la colonna geometry è presente nel dataset
if 'geometry' in df.columns and 'indice_bisogno_abitativo' in df.columns:
# Converti la colonna geometry in oggetti geometrici
try:
# Crea una copia del dataframe per evitare warning SettingWithCopyWarning
df_copy = df.copy()
df_copy['geometry'] = df_copy['geometry'].apply(lambda x: wkt.loads(x) if isinstance(x, str) else x)
# Crea GeoDataFrame con il CRS corretto (EPSG:32632)
gdf = gpd.GeoDataFrame(df_copy, geometry='geometry', crs="EPSG:32632")
# Elimina le righe senza l'indice di bisogno abitativo
gdf = gdf.dropna(subset=['indice_bisogno_abitativo'])
# Calcola i natural breaks di Jenks per la classificazione
n_classes = 5
breaks = jenks_breaks(gdf['indice_bisogno_abitativo'].values, n_classes=n_classes)
# Crea una nuova colonna con le classi di Jenks
labels = [f'Classe {i+1}' for i in range(len(breaks)-1)]
gdf.loc[:, 'jenks_bins'] = pd.cut(
gdf['indice_bisogno_abitativo'],
bins=breaks,
labels=labels,
include_lowest=True
)
# Crea una figura più grande per la mappa
fig, ax = plt.subplots(figsize=(15, 15))
# Visualizza la mappa con la classificazione di Jenks
gdf.plot(
column='indice_bisogno_abitativo',
scheme='user_defined',
classification_kwds={'bins': breaks},
cmap='RdYlGn_r', # Rosso per valori alti (alto bisogno), verde per valori bassi
legend=True,
ax=ax,
legend_kwds={
'title': 'Indice di bisogno abitativo',
'loc': 'lower right',
'fontsize': 10
}
)
# Aggiungi titolo
plt.title('Mappa dell\'indice di bisogno abitativo', fontsize=15)
# Rimuovi assi
ax.set_axis_off()
# Mostra la mappa
plt.tight_layout()
plt.show()
# Visualizza le statistiche per classe - con observed=True per evitare il warning
class_stats = gdf.groupby('jenks_bins', observed=True)['indice_bisogno_abitativo'].agg(['min', 'max', 'mean', 'count'])
class_stats['percentuale'] = class_stats['count'] / class_stats['count'].sum() * 100
print("\nStatistiche per classi di Jenks dell'indice di bisogno abitativo:")
print(class_stats)
# Mappa delle aree prioritarie per l'intervento delle ASA
if 'area_prioritaria_ASA' in gdf.columns:
fig, ax = plt.subplots(figsize=(15, 15))
# Plot di base con tutte le aree in grigio chiaro
gdf.plot(color='lightgrey', ax=ax)
# Plot delle aree prioritarie in rosso
gdf[gdf['area_prioritaria_ASA']].plot(
color='red',
ax=ax,
alpha=0.7
)
# Aggiungi titolo
plt.title('Aree prioritarie per l\'intervento delle Agenzie Sociali per l\'Affitto', fontsize=15)
# Rimuovi assi
ax.set_axis_off()
# Aggiungi legenda manuale
from matplotlib.patches import Patch
legend_elements = [
Patch(facecolor='lightgrey', edgecolor='grey', label='Aree non prioritarie'),
Patch(facecolor='red', edgecolor='darkred', alpha=0.7, label='Aree prioritarie per ASA')
]
ax.legend(handles=legend_elements, loc='lower right')
plt.tight_layout()
plt.show()
except Exception as e:
print(f"Errore nella creazione della mappa: {e}")
print("Verificare che la colonna geometry contenga dati geometrici validi")
else:
print("Impossibile creare la mappa: colonna geometry o indice_bisogno_abitativo non presenti nel dataset")
Statistiche per classi di Jenks dell'indice di bisogno abitativo:
min max mean count percentuale
jenks_bins
Classe 1 0.000000 23.330223 17.800496 460 20.122485
Classe 2 23.382602 32.676184 28.924763 1026 44.881890
Classe 3 32.750643 42.526431 36.553276 570 24.934383
Classe 4 42.619442 59.709691 48.523306 189 8.267717
Classe 5 60.535963 100.000000 71.508643 41 1.793526
Clustering delle aree per identificare pattern diversi di bisogno abitativo¶
In [15]:
# Clustering delle aree per identificare pattern diversi di bisogno abitativo
# Selezioniamo le variabili per il clustering
print("\n--- Analisi di clustering per identificare pattern di bisogno abitativo ---")
# Verifichiamo quali indicatori sono disponibili per il clustering
cluster_vars = []
for var in ['densita_abitativa', 'reddito_pc', 'indice_accessibilita_servizi',
'indice_bisogno_abitativo', 'indice_invecchiamento', 'indice_urbanizzazione']:
if var in df.columns:
cluster_vars.append(var)
print(f"Variabili utilizzate per il clustering: {cluster_vars}")
if len(cluster_vars) >= 3: # Assicuriamoci di avere almeno 3 variabili
# Creiamo una copia e eliminiamo righe con NaN
df_cluster = df.dropna(subset=cluster_vars).copy()
if len(df_cluster) > 100: # Assicuriamoci di avere abbastanza dati
# Standardizziamo
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_cluster[cluster_vars])
# Applichiamo K-means direttamente con un numero di cluster predefinito
n_clusters = 5
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
df_cluster.loc[:, 'cluster'] = kmeans.fit_predict(X_scaled)
# Analizziamo le caratteristiche dei cluster
cluster_means = df_cluster.groupby('cluster')[cluster_vars].mean()
print("\nCaratteristiche medie dei cluster:")
print(cluster_means)
# Conteggio delle osservazioni per cluster
cluster_counts = df_cluster['cluster'].value_counts().sort_index()
print("\nNumero di celle per cluster:")
print(cluster_counts)
# Visualizziamo i cluster in base a due variabili principali
plt.figure(figsize=(12, 8))
scatter = plt.scatter(
df_cluster['indice_bisogno_abitativo'],
df_cluster['reddito_pc'] if 'reddito_pc' in df_cluster.columns else df_cluster[cluster_vars[1]],
c=df_cluster['cluster'],
cmap='viridis',
s=50,
alpha=0.7
)
plt.colorbar(scatter, label='Cluster')
plt.xlabel('Indice di bisogno abitativo')
plt.ylabel('Reddito pro capite' if 'reddito_pc' in df_cluster.columns else cluster_vars[1])
plt.title('Cluster di aree in base a bisogno abitativo e reddito')
plt.grid(True)
plt.show()
# Visualizza distribuzione caratteristiche per cluster
fig, axes = plt.subplots(nrows=len(cluster_vars), figsize=(15, 4*len(cluster_vars)))
for i, var in enumerate(cluster_vars):
for cluster_id in range(n_clusters):
sns.kdeplot(
df_cluster[df_cluster['cluster'] == cluster_id][var],
ax=axes[i],
label=f'Cluster {cluster_id}'
)
axes[i].set_title(f'Distribuzione di {var} per cluster')
axes[i].set_xlabel(var)
axes[i].set_ylabel('Densità')
axes[i].legend()
axes[i].grid(True)
plt.tight_layout()
plt.show()
# Visualizza profili dei cluster
plt.figure(figsize=(15, 8))
cluster_means_norm = cluster_means.copy()
# Normalizziamo per visualizzazione
for col in cluster_means_norm.columns:
cluster_means_norm[col] = (cluster_means_norm[col] - cluster_means_norm[col].min()) / \
(cluster_means_norm[col].max() - cluster_means_norm[col].min())
# Trasposizione per visualizzazione
cluster_means_norm = cluster_means_norm.T
# Plot
ax = cluster_means_norm.plot(kind='bar', figsize=(15, 8), width=0.8)
plt.title('Profili dei cluster (valori normalizzati)', fontsize=16)
plt.xlabel('Variabili', fontsize=14)
plt.ylabel('Valore normalizzato', fontsize=14)
plt.legend(title='Cluster')
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.xticks(rotation=45, ha='right')
# Aggiungiamo etichette alle barre
for container in ax.containers:
ax.bar_label(container, fmt='%.2f', fontsize=8)
plt.tight_layout()
plt.show()
# Aggiungiamo il risultato del clustering al dataframe originale
df.loc[df_cluster.index, 'cluster_abitativo'] = df_cluster['cluster']
print("\nCluster completato. È stata aggiunta la colonna 'cluster_abitativo' al dataframe.")
else:
print(f"Impossibile eseguire il clustering: solo {len(df_cluster)} osservazioni disponibili dopo la rimozione dei NaN.")
else:
print(f"Impossibile eseguire il clustering: servono almeno 3 variabili, trovate {len(cluster_vars)}.")
--- Analisi di clustering per identificare pattern di bisogno abitativo --- Variabili utilizzate per il clustering: ['densita_abitativa', 'reddito_pc', 'indice_bisogno_abitativo', 'indice_invecchiamento', 'indice_urbanizzazione']
Caratteristiche medie dei cluster:
densita_abitativa reddito_pc indice_bisogno_abitativo \
cluster
0 93.302017 243.101325 31.075536
1 471.779655 19685.403972 20.760952
2 4856.416349 18746.720512 37.799969
3 164.050142 3981.319809 37.157320
4 15867.599916 19796.807893 63.037880
indice_invecchiamento indice_urbanizzazione
cluster
0 23.877071 6.935050
1 137.594871 8.098522
2 175.758200 71.757414
3 254.614723 4.075281
4 221.606468 92.170555
Numero di celle per cluster:
cluster
0 813
1 704
2 353
3 334
4 82
Name: count, dtype: int64
<Figure size 1500x800 with 0 Axes>
Cluster completato. È stata aggiunta la colonna 'cluster_abitativo' al dataframe.
/tmp/ipykernel_1171799/1377859242.py:103: PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()` df.loc[df_cluster.index, 'cluster_abitativo'] = df_cluster['cluster']
Analisi dell'indice di bisogno abitativo per densità abitativa¶
Questo ci aiuta a identificare aree ad alta densità con alto bisogno
In [16]:
if all(col in df.columns for col in ['indice_bisogno_abitativo', 'densita_abitativa']):
plt.figure(figsize=(12, 8))
plt.scatter(
df['densita_abitativa'].clip(0, df['densita_abitativa'].quantile(0.99)), # Clipping per visualizzazione
df['indice_bisogno_abitativo'],
alpha=0.5
)
plt.xlabel('Densità abitativa (ab/km²)')
plt.ylabel('Indice di bisogno abitativo')
plt.title('Relazione tra densità abitativa e bisogno abitativo')
plt.grid(True)
plt.show()
Individuazione delle aree prioritarie per l'intervento delle Agenzie Sociali per l'Affitto¶
Creiamo un indicatore di priorità di intervento
In [17]:
if 'indice_bisogno_abitativo' in df.columns:
# Definiamo aree prioritarie quelle con alto bisogno abitativo
high_need = df['indice_bisogno_abitativo'] > df['indice_bisogno_abitativo'].quantile(0.8)
# Se disponibile, consideriamo anche il reddito
if 'reddito_pc' in df.columns:
low_income = df['reddito_pc'] < df['reddito_pc'].quantile(0.3)
priority_areas = high_need & low_income
else:
priority_areas = high_need
# Aggiungiamo colonna per aree prioritarie
df['area_prioritaria_ASA'] = priority_areas
# Contiamo quante celle sono prioritarie
num_priority = priority_areas.sum()
print(f"Identificate {num_priority} celle come aree prioritarie per l'intervento delle Agenzie Sociali per l'Affitto")
# Calcoliamo popolazione totale in queste aree
if 'ISTAT||POP_TOT__21' in df.columns:
pop_priority = df.loc[priority_areas, 'ISTAT||POP_TOT__21'].sum()
total_pop = df['ISTAT||POP_TOT__21'].sum()
print(f"Popolazione nelle aree prioritarie: {pop_priority:.0f} abitanti")
print(f"Percentuale della popolazione totale: {pop_priority/total_pop*100:.1f}%")
Identificate 0 celle come aree prioritarie per l'intervento delle Agenzie Sociali per l'Affitto Popolazione nelle aree prioritarie: 0 abitanti Percentuale della popolazione totale: 0.0%
/tmp/ipykernel_1171799/3107104142.py:13: PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()` df['area_prioritaria_ASA'] = priority_areas
Visualizzazione delle caratteristiche delle aree prioritarie vs non prioritarie¶
In [18]:
# Clustering delle aree per identificare pattern diversi di bisogno abitativo
print("\n--- Analisi di clustering per identificare pattern di bisogno abitativo ---")
# Verifichiamo quali indicatori sono disponibili per il clustering
available_vars = []
for var in ['densita_abitativa', 'reddito_pc', 'indice_bisogno_abitativo',
'indice_invecchiamento', 'ISTAT||POP_TOT__21']:
if var in df.columns:
available_vars.append(var)
print(f"Variabili disponibili per il clustering: {available_vars}")
# Se abbiamo almeno 2 variabili, procediamo con il clustering
if len(available_vars) >= 2:
# Creiamo una copia e eliminiamo righe con NaN
df_cluster = df.dropna(subset=available_vars).copy()
if len(df_cluster) > 100: # Assicuriamoci di avere abbastanza dati
# Standardizziamo
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_cluster[available_vars])
# Applichiamo K-means direttamente con un numero di cluster predefinito
n_clusters = 5
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
df_cluster.loc[:, 'cluster'] = kmeans.fit_predict(X_scaled)
# Analizziamo le caratteristiche dei cluster
cluster_means = df_cluster.groupby('cluster')[available_vars].mean()
print("\nCaratteristiche medie dei cluster:")
print(cluster_means)
# Conteggio delle osservazioni per cluster
cluster_counts = df_cluster['cluster'].value_counts().sort_index()
print("\nNumero di celle per cluster:")
print(cluster_counts)
# Scegliamo le due variabili principali per la visualizzazione
if 'indice_bisogno_abitativo' in available_vars and 'reddito_pc' in available_vars:
var_x = 'indice_bisogno_abitativo'
var_y = 'reddito_pc'
elif len(available_vars) >= 2:
var_x = available_vars[0]
var_y = available_vars[1]
else:
# Fallback nel caso improbabile che abbiamo solo una variabile
var_x = available_vars[0]
var_y = available_vars[0]
# Visualizziamo i cluster in base alle due variabili scelte
plt.figure(figsize=(12, 8))
scatter = plt.scatter(
df_cluster[var_x],
df_cluster[var_y],
c=df_cluster['cluster'],
cmap='viridis',
s=50,
alpha=0.7
)
plt.colorbar(scatter, label='Cluster')
plt.xlabel(var_x.replace('_', ' ').title())
plt.ylabel(var_y.replace('_', ' ').title())
plt.title(f'Cluster di aree in base a {var_x.replace("_", " ")} e {var_y.replace("_", " ")}')
plt.grid(True)
plt.show()
# Visualizza distribuzione caratteristiche per cluster
fig, axes = plt.subplots(nrows=len(available_vars), figsize=(15, 4*len(available_vars)))
# Gestiamo il caso di una sola variabile
if len(available_vars) == 1:
axes = [axes]
for i, var in enumerate(available_vars):
for cluster_id in range(n_clusters):
sns.kdeplot(
df_cluster[df_cluster['cluster'] == cluster_id][var],
ax=axes[i],
label=f'Cluster {cluster_id}'
)
axes[i].set_title(f'Distribuzione di {var.replace("_", " ").title()} per cluster')
axes[i].set_xlabel(var.replace('_', ' ').title())
axes[i].set_ylabel('Densità')
axes[i].legend()
axes[i].grid(True)
plt.tight_layout()
plt.show()
# Visualizza profili dei cluster
plt.figure(figsize=(15, 8))
cluster_means_norm = cluster_means.copy()
# Normalizziamo per visualizzazione
for col in cluster_means_norm.columns:
if cluster_means_norm[col].max() > cluster_means_norm[col].min(): # Evita divisione per zero
cluster_means_norm[col] = (cluster_means_norm[col] - cluster_means_norm[col].min()) / \
(cluster_means_norm[col].max() - cluster_means_norm[col].min())
# Trasposizione per visualizzazione
cluster_means_norm = cluster_means_norm.T
# Plot
ax = cluster_means_norm.plot(kind='bar', figsize=(15, 8), width=0.8)
plt.title('Profili dei cluster (valori normalizzati)', fontsize=16)
plt.xlabel('Variabili', fontsize=14)
plt.ylabel('Valore normalizzato', fontsize=14)
plt.legend(title='Cluster')
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.xticks(rotation=45, ha='right')
# Aggiungiamo etichette alle barre
for container in ax.containers:
ax.bar_label(container, fmt='%.2f', fontsize=8)
plt.tight_layout()
plt.show()
# Aggiungiamo il risultato del clustering al dataframe originale
df.loc[df_cluster.index, 'cluster_abitativo'] = df_cluster['cluster']
print("\nCluster completato. È stata aggiunta la colonna 'cluster_abitativo' al dataframe.")
else:
print(f"Impossibile eseguire il clustering: solo {len(df_cluster)} osservazioni disponibili dopo la rimozione dei NaN.")
else:
print(f"Impossibile eseguire il clustering: servono almeno 2 variabili, trovate {len(available_vars)}.")
--- Analisi di clustering per identificare pattern di bisogno abitativo --- Variabili disponibili per il clustering: ['densita_abitativa', 'reddito_pc', 'indice_bisogno_abitativo', 'indice_invecchiamento', 'ISTAT||POP_TOT__21']
Caratteristiche medie dei cluster:
densita_abitativa reddito_pc indice_bisogno_abitativo \
cluster
0 6483.014847 19283.985852 41.526112
1 86.346064 293.485950 30.797785
2 668.613287 19779.280767 21.957799
3 17018.524340 19379.756359 65.685866
4 234.547111 2744.358938 37.578250
indice_invecchiamento ISTAT||POP_TOT__21
cluster
0 193.053282 5123.779636
1 16.723203 68.274058
2 143.879570 528.443553
3 221.447272 13450.870563
4 229.170940 185.315356
Numero di celle per cluster:
cluster
0 255
1 767
2 817
3 66
4 381
Name: count, dtype: int64
<Figure size 1500x800 with 0 Axes>
Cluster completato. È stata aggiunta la colonna 'cluster_abitativo' al dataframe.
In [19]:
# Visualizzazione delle caratteristiche delle aree prioritarie vs non prioritarie
if 'area_prioritaria_ASA' in df.columns:
# Identifichiamo le colonne effettivamente disponibili
potential_cols = ['indice_bisogno_abitativo', 'densita_abitativa', 'reddito_pc',
'indice_invecchiamento', 'ISTAT||POP_TOT__21']
# Filtriamo solo le colonne presenti nel dataframe
available_cols = [col for col in potential_cols if col in df.columns]
print(f"\nColonne disponibili per il confronto: {available_cols}")
if available_cols:
# Preparazione dati per il confronto
priority_stats = df.groupby('area_prioritaria_ASA')[available_cols].mean()
# Visualizzazione comparativa
plt.figure(figsize=(15, 10))
# Plot per ogni indicatore disponibile
n_indicators = len(available_cols)
n_rows = (n_indicators + 1) // 2 # Arrotondiamo per eccesso
for i, indicator in enumerate(available_cols, 1):
plt.subplot(n_rows, 2, i)
# Creazione barplot
bars = plt.bar(
[0, 1],
priority_stats[indicator],
tick_label=['Non prioritarie', 'Prioritarie']
)
# Colorazione delle barre
bars[0].set_color('lightblue')
bars[1].set_color('darkblue')
plt.title(indicator.replace('_', ' ').title())
plt.grid(axis='y', linestyle='--', alpha=0.7)
# Aggiungiamo i valori sopra le barre
for bar in bars:
height = bar.get_height()
plt.text(
bar.get_x() + bar.get_width()/2.,
height*1.01,
f'{height:.1f}',
ha='center',
va='bottom'
)
plt.tight_layout()
plt.suptitle('Confronto tra aree prioritarie e non prioritarie', fontsize=16)
plt.subplots_adjust(top=0.92)
plt.show()
else:
print("Nessuna colonna disponibile per il confronto")
Colonne disponibili per il confronto: ['indice_bisogno_abitativo', 'densita_abitativa', 'reddito_pc', 'indice_invecchiamento', 'ISTAT||POP_TOT__21']
Creazione di un indice di sostenibilità economica dell'affitto¶
Questo indice stima quanto è sostenibile l'affitto in relazione al reddito
In [20]:
if 'reddito_pc' in df.columns:
# Stimiamo un canone medio mensile di affitto in base al reddito e all'area (simulazione)
# Questo è un esempio, in realtà il canone dovrebbe essere basato su dati reali di mercato
# ma possiamo fare una stima approssimativa
# Assumiamo che in media il canone sia proporzionale al reddito dell'area
# e che nelle aree urbanizzate il canone sia più alto a parità di reddito
if 'indice_urbanizzazione' in df.columns:
# Stimiamo il canone mensile (questo è un modello molto semplificato)
# Usiamo il reddito pro-capite come proxy e aggiungiamo un fattore di "pressione urbana"
df['canone_stimato_mensile'] = (df['reddito_pc'] / 50) * (1 + df['indice_urbanizzazione'] / 200)
else:
# Senza l'indice di urbanizzazione, usiamo solo il reddito
df['canone_stimato_mensile'] = df['reddito_pc'] / 40
# Calcoliamo l'incidenza del canone sul reddito (%)
df['incidenza_canone'] = (df['canone_stimato_mensile'] * 12 / df['reddito_pc']) * 100
# Classifichiamo la sostenibilità in base all'incidenza
conditions = [
df['incidenza_canone'] <= 25, # Sostenibile
(df['incidenza_canone'] > 25) & (df['incidenza_canone'] <= 40), # Critico
df['incidenza_canone'] > 40 # Non sostenibile
]
choices = ['Sostenibile', 'Critico', 'Non sostenibile']
df['sostenibilita_affitto'] = np.select(conditions, choices, default='Non classificato')
# Visualizziamo la distribuzione
plt.figure(figsize=(12, 6))
sns.histplot(df['incidenza_canone'].clip(0, 100), bins=30, kde=True)
plt.title('Distribuzione dell\'incidenza del canone di affitto sul reddito')
plt.xlabel('Incidenza canone sul reddito (%)')
plt.ylabel('Numero di celle')
plt.axvline(x=25, color='green', linestyle='--', label='Limite sostenibilità (25%)')
plt.axvline(x=40, color='red', linestyle='--', label='Limite criticità (40%)')
plt.legend()
plt.show()
# Visualizziamo la distribuzione delle categorie di sostenibilità
plt.figure(figsize=(10, 6))
sns.countplot(y='sostenibilita_affitto', data=df)
plt.title('Distribuzione delle categorie di sostenibilità dell\'affitto')
plt.xlabel('Numero di celle')
plt.ylabel('Categoria di sostenibilità')
plt.show()
/tmp/ipykernel_1171799/2704280889.py:11: PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()` df['canone_stimato_mensile'] = (df['reddito_pc'] / 50) * (1 + df['indice_urbanizzazione'] / 200) /tmp/ipykernel_1171799/2704280889.py:17: PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()` df['incidenza_canone'] = (df['canone_stimato_mensile'] * 12 / df['reddito_pc']) * 100 /tmp/ipykernel_1171799/2704280889.py:27: PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()` df['sostenibilita_affitto'] = np.select(conditions, choices, default='Non classificato')
Sintesi dei risultati per il tavolo SOCIAL RENTAL AGENCIES¶
In [21]:
print("\n=== SINTESI DEI RISULTATI PER IL TAVOLO SOCIAL RENTAL AGENCIES ===")
# Riassumiamo i principali indicatori
print("\n1. Distribuzione dell'indice di bisogno abitativo:")
if 'classe_bisogno_abitativo' in df.columns:
print(df['classe_bisogno_abitativo'].value_counts(normalize=True).sort_index() * 100)
print("\n2. Sostenibilità economica dell'affitto:")
if 'sostenibilita_affitto' in df.columns:
sostenibilita_counts = df['sostenibilita_affitto'].value_counts()
sostenibilita_perc = sostenibilita_counts / sostenibilita_counts.sum() * 100
print(sostenibilita_perc)
print("\n3. Aree prioritarie per l'intervento delle Agenzie Sociali per l'Affitto:")
if 'area_prioritaria_ASA' in df.columns and 'ISTAT||POP_TOT__21' in df.columns:
pop_priority = df.loc[df['area_prioritaria_ASA'], 'ISTAT||POP_TOT__21'].sum()
total_pop = df['ISTAT||POP_TOT__21'].sum()
print(f"- Popolazione nelle aree prioritarie: {pop_priority:.0f} abitanti ({pop_priority/total_pop*100:.1f}% del totale)")
# Caratteristiche delle aree prioritarie
if 'reddito_pc' in df.columns:
redd_priority = df.loc[df['area_prioritaria_ASA'], 'reddito_pc'].mean()
redd_non_priority = df.loc[~df['area_prioritaria_ASA'], 'reddito_pc'].mean()
print(f"- Reddito medio nelle aree prioritarie: {redd_priority:.0f} € (vs {redd_non_priority:.0f} € nelle aree non prioritarie)")
if 'incidenza_canone' in df.columns:
inc_priority = df.loc[df['area_prioritaria_ASA'], 'incidenza_canone'].mean()
inc_non_priority = df.loc[~df['area_prioritaria_ASA'], 'incidenza_canone'].mean()
print(f"- Incidenza media del canone nelle aree prioritarie: {inc_priority:.1f}% (vs {inc_non_priority:.1f}% nelle aree non prioritarie)")
print("\n4. Raccomandazioni per il tavolo SOCIAL RENTAL AGENCIES:")
print("- Concentrare l'intervento delle ASA nelle aree identificate come prioritarie")
print("- Sviluppare strumenti di garanzia per i proprietari nelle aree a maggiore difficoltà abitativa")
print("- Prevedere contributi per ridurre l'incidenza del canone sul reddito nelle aree critiche")
print("- Implementare politiche integrate di rigenerazione urbana e accesso all'abitazione")
=== SINTESI DEI RISULTATI PER IL TAVOLO SOCIAL RENTAL AGENCIES === 1. Distribuzione dell'indice di bisogno abitativo: classe_bisogno_abitativo Molto basso 20.034996 Basso 29.352581 Medio 10.629921 Alto 19.991251 Molto alto 19.991251 Name: proportion, dtype: float64 2. Sostenibilità economica dell'affitto: sostenibilita_affitto Non classificato 44.138233 Critico 28.958880 Sostenibile 26.902887 Name: count, dtype: float64 3. Aree prioritarie per l'intervento delle Agenzie Sociali per l'Affitto: - Popolazione nelle aree prioritarie: 0 abitanti (0.0% del totale) - Reddito medio nelle aree prioritarie: nan € (vs 10335 € nelle aree non prioritarie) - Incidenza media del canone nelle aree prioritarie: nan% (vs 27.6% nelle aree non prioritarie) 4. Raccomandazioni per il tavolo SOCIAL RENTAL AGENCIES: - Concentrare l'intervento delle ASA nelle aree identificate come prioritarie - Sviluppare strumenti di garanzia per i proprietari nelle aree a maggiore difficoltà abitativa - Prevedere contributi per ridurre l'incidenza del canone sul reddito nelle aree critiche - Implementare politiche integrate di rigenerazione urbana e accesso all'abitazione
In [22]:
cols_indicatori = [
# 'densita_abitativa',
# 'indice_urbanizzazione',
# 'indice_invecchiamento',
# 'reddito_pc',
'indice_bisogno_abitativo',
# 'classe_bisogno_abitativo',
# 'cluster_abitativo',
# 'area_prioritaria_ASA',
# 'canone_stimato_mensile',
# 'incidenza_canone',
# 'sostenibilita_affitto'
]
In [23]:
map_indicatori = {
'indice_bisogno_abitativo': 'IBA',
'area_prioritaria_ASA': 'APA'
}
In [24]:
n_classes = 5
for col in cols_indicatori:
# Calcola i natural breaks di Jenks per la classificazione
breaks = jenks_breaks(df[col].values, n_classes=n_classes)
# Crea una nuova colonna con le classi di Jenks
labels = [i+1 for i in range(len(breaks)-1)]
df.loc[:, f'{col}_'] = pd.cut(
df[col],
bins=breaks,
labels=labels,
include_lowest=True
)
df[col] = df[f'{col}_'].astype(int)
del df[f'{col}_']
df['area_prioritaria_ASA'] = df['area_prioritaria_ASA'].map({
True: 'Area Prioritaria ASA',
False: 'Area NON prioritaria'
})
df.rename(columns=map_indicatori, inplace=True)
gdf = gpd.GeoDataFrame(df[list(map_indicatori.values()) + ['geometry', f'h3_{resolution:02}']], geometry='geometry', crs=32632)
gdf['geometry'] = gdf['geometry'].centroid
gdf.to_crs(4326).to_file(os.path.join(out_path, f'grid_{resolution:02}_pt_tavolo3.geojson'), driver="GeoJSON")
/tmp/ipykernel_1171799/2549366052.py:9: PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()`
df.loc[:, f'{col}_'] = pd.cut(
In [ ]: