122 lines
5.2 KiB
Python
122 lines
5.2 KiB
Python
import logging
|
|
import numpy as np
|
|
from scipy import stats
|
|
|
|
DEFAULT_AGES = np.arange(0, 50)
|
|
DEFAULT_MORTALITY_DISTRIBUTION = 5 * stats.exponweib(a=1, c=3, loc=0, scale=30).pdf(DEFAULT_AGES)
|
|
DEFAULT_TOTAL_POPULATION = 40e6
|
|
|
|
|
|
def steady_state_population(mortality_distribution: np.array = None, total_population: int = DEFAULT_TOTAL_POPULATION):
|
|
"""
|
|
## Establishing the initial population
|
|
|
|
We use an array ranging from 0 to 50 years of age. We start with an equal distribution and run the simulation until
|
|
it arrives at the steady-state solution. _In reality, we'd use the real data here_.
|
|
|
|
Args:
|
|
mortality_distribution: The chance a vehicle that is _x_ years old leaves the population
|
|
total_population: The total vehicle population
|
|
|
|
Returns:
|
|
The steady-state population distribution, splitting the `total_population` vehicles across their ages.
|
|
"""
|
|
|
|
# Initialize mortality
|
|
if mortality_distribution is None:
|
|
mortality_distribution = DEFAULT_MORTALITY_DISTRIBUTION
|
|
|
|
# Initialize population with uniform distribution
|
|
population_distribution = np.ones(mortality_distribution.shape) * (total_population / len(mortality_distribution))
|
|
old_population_distribution = np.zeros(mortality_distribution.shape)
|
|
i = 0
|
|
|
|
while not np.all(np.isclose(population_distribution, old_population_distribution)):
|
|
old_population = sum(population_distribution)
|
|
old_population_distribution = population_distribution.copy()
|
|
i += 1
|
|
|
|
# First, simulate the mortality and keep count of the new vehicles we need to produce to replace the existing ones
|
|
new_cars = population_distribution[-1]
|
|
for j in reversed(range(len(mortality_distribution) - 1)):
|
|
population_distribution[j + 1] = (1 - mortality_distribution[j]) * population_distribution[j]
|
|
new_cars += (mortality_distribution[j] * population_distribution[j])
|
|
|
|
# Last, add new vehicles in otder for the sum to make sense
|
|
population_distribution[0] = 0
|
|
population_before_creation = sum(population_distribution)
|
|
new_cars = ((old_population) + 0) - population_before_creation
|
|
population_distribution[0] = new_cars
|
|
|
|
if i > 1000:
|
|
raise Exception("Failed to converge after 1000 iterations!")
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.info(f"Steady-state population achieved after {i} iterations.")
|
|
|
|
return population_distribution
|
|
|
|
def calculate_subsidy_mortality(steady_state_population: np.array, normal_mortality_distribution: np.array,
|
|
min_age: int = 10, total_cars: int = 1e6):
|
|
"""
|
|
Calculates a mortality distribution for the subsidy year so that `total_cars` with an age greater or equal then
|
|
`min_age` leave the population.
|
|
"""
|
|
|
|
# Count the total cars older than the minimum age:
|
|
total_old_cars = np.sum(steady_state_population[min_age:])
|
|
|
|
assert total_old_cars > total_cars, "You cannot recycle more vehicles than exist."
|
|
|
|
# Calculate which fraction of them needs to be recycles
|
|
recycling_fraction = total_cars/total_old_cars
|
|
|
|
subsidy_mortality_distribution = normal_mortality_distribution.copy()
|
|
for i in np.arange(min_age, len(steady_state_population)):
|
|
subsidy_mortality_distribution[i] += recycling_fraction
|
|
assert subsidy_mortality_distribution[i] <= 1, "You cannot recycle more vehicles than exist."
|
|
|
|
return subsidy_mortality_distribution
|
|
|
|
def calculate_covid_mortality(steady_state_population: np.array, normal_mortality_distribution: np.array,
|
|
reduction_in_production: int = 0.23):
|
|
"""
|
|
Calculates a mortality distribution for the subsidy year so that `total_cars` with an age greater or equal then
|
|
`min_age` leave the population.
|
|
"""
|
|
return normal_mortality_distribution * (1-reduction_in_production)
|
|
|
|
def iterate(mortality_distribution: np.array, population_distribution: np.array, fraction_of_new_cars: float = 1.0):
|
|
old_population = sum(population_distribution)
|
|
mortality_totals = mortality_distribution * population_distribution
|
|
population_distribution -= mortality_totals
|
|
|
|
# First, simulate the mortality
|
|
for j in reversed(range(len(mortality_distribution) - 1)):
|
|
population_distribution[j + 1] = population_distribution[j]
|
|
|
|
# Last, add new vehicles in order for the sum to make sense
|
|
population_distribution[0] = 0
|
|
population_before_creation = sum(population_distribution)
|
|
new_cars = old_population - population_before_creation
|
|
population_distribution[0] = new_cars * fraction_of_new_cars
|
|
|
|
# assert new_cars == sum(mortality_totals)
|
|
|
|
return population_distribution, mortality_totals
|
|
|
|
|
|
def sanity_check_for_subsidy():
|
|
population_distribution = steady_state_population()
|
|
|
|
subsidy_mortality = calculate_subsidy_mortality(population_distribution, DEFAULT_MORTALITY_DISTRIBUTION)
|
|
|
|
new_population_distribution = iterate(DEFAULT_MORTALITY_DISTRIBUTION, population_distribution)
|
|
normal_year_new_cars = new_population_distribution[0]
|
|
|
|
subsidy_population_distribution = iterate(subsidy_mortality, population_distribution)
|
|
subsidy_new_cars = subsidy_population_distribution[0]
|
|
|
|
assert np.abs((subsidy_new_cars - normal_year_new_cars) - 1e6) < 10000
|
|
|