"""
Applies the Newton Krylov solver Object to the static GS problem.
Implements both forward and inverse GS solvers.
Copyright 2025 UKAEA, UKRI-STFC, and The Authors, as per the COPYRIGHT and README files.
This file is part of FreeGSNKE.
FreeGSNKE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
FreeGSNKE is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
You should have received a copy of the GNU Lesser General Public License
along with FreeGSNKE. If not, see <http://www.gnu.org/licenses/>.
"""
import warnings
from copy import deepcopy
import freegs4e
import matplotlib.pyplot as plt
import numpy as np
from freegs4e.gradshafranov import Greens
from . import nk_solver_H as nk_solver
[docs]
class NKGSsolver:
"""Solver for the non-linear forward Grad Shafranov (GS)
static problem. Here, the GS problem is written as a root
problem in the plasma flux psi. This root problem is
passed to and solved by the NewtonKrylov solver itself,
class nk_solver.
The solution domain is set at instantiation time, through the
input FreeGSNKE equilibrium object.
The non-linear solvers are called using the 'forward_solve', 'inverse_solve' or generic 'solve' methods.
"""
[docs]
def __init__(self, eq):
"""Instantiates the solver object.
Based on the domain grid of the input equilibrium object, it prepares
- the linear solver 'self.linear_GS_solver'
- the response matrix of boundary grid points 'self.greens_boundary'
Parameters
----------
eq : a FreeGSNKE equilibrium object.
The domain grid defined by (eq.R, eq.Z) is the solution domain
adopted for the GS problems. Calls to the nonlinear solver will
use the grid domain set at instantiation time. Re-instantiation
is necessary in order to change the propertes of either grid or
domain.
"""
# eq is an Equilibrium instance, it has to have the same domain and grid as
# the ones the solver will be called on
self.eqR = eq.R
R = eq.R
Z = eq.Z
self.R = R
self.Z = Z
R_1D = R[:, 0]
Z_1D = Z[0, :]
# for reshaping
nx, ny = np.shape(R)
self.nx = nx
self.ny = ny
# for integration
dR = R[1, 0] - R[0, 0]
dZ = Z[0, 1] - Z[0, 0]
self.dRdZ = dR * dZ
self.nksolver = nk_solver.nksolver(problem_dimension=self.nx * self.ny)
# linear solver for del*Psi=RHS with fixed RHS
self.linear_GS_solver = freegs4e.multigrid.createVcycle(
nx,
ny,
freegs4e.gradshafranov.GSsparse4thOrder(
eq.R[0, 0], eq.R[-1, 0], eq.Z[0, 0], eq.Z[0, -1]
),
nlevels=1,
ncycle=1,
niter=2,
direct=True,
)
# List of indices on the boundary
bndry_indices = np.concatenate(
[
[(x, 0) for x in range(nx)],
[(x, ny - 1) for x in range(nx)],
[(0, y) for y in np.arange(1, ny - 1)],
[(nx - 1, y) for y in np.arange(1, ny - 1)],
]
)
self.bndry_indices = bndry_indices
# matrices of responses of boundary locations to each grid positions
greenfunc = Greens(
R[np.newaxis, :, :],
Z[np.newaxis, :, :],
R_1D[bndry_indices[:, 0]][:, np.newaxis, np.newaxis],
Z_1D[bndry_indices[:, 1]][:, np.newaxis, np.newaxis],
)
# Prevent infinity/nan by removing Greens(x,y;x,y)
zeros = np.ones_like(greenfunc)
zeros[
np.arange(len(bndry_indices)), bndry_indices[:, 0], bndry_indices[:, 1]
] = 0
self.greenfunc = greenfunc * zeros * self.dRdZ
# RHS/Jtor
self.rhs_before_jtor = -freegs4e.gradshafranov.mu0 * eq.R
[docs]
def freeboundary(self, plasma_psi, tokamak_psi, profiles):
"""Imposes boundary conditions on set of boundary points.
Parameters
----------
plasma_psi : np.array of size eq.nx*eq.ny
magnetic flux due to the plasma
tokamak_psi : np.array of size eq.nx*eq.ny
magnetic flux due to the tokamak alone, including all metal currents,
in both active coils and passive structures
profiles : FreeGSNKE profile object
profile object describing target plasma properties.
Used to calculate current density jtor
"""
# tokamak_psi is the psi contribution due to the currents assigned to the tokamak coils in eq, ie.
# tokamak_psi = eq.tokamak.calcPsiFromGreens(pgreen=eq._pgreen)
# jtor and RHS given tokamak_psi above and the input plasma_psi
self.jtor = profiles.Jtor(
self.R,
self.Z,
(tokamak_psi + plasma_psi).reshape(self.nx, self.ny),
)
self.rhs = self.rhs_before_jtor * self.jtor
# calculates and imposes the boundary conditions
self.psi_boundary = np.zeros_like(self.R)
psi_bnd = np.sum(self.greenfunc * self.jtor[np.newaxis, :, :], axis=(-1, -2))
self.psi_boundary[:, 0] = psi_bnd[: self.nx]
self.psi_boundary[:, -1] = psi_bnd[self.nx : 2 * self.nx]
self.psi_boundary[0, 1 : self.ny - 1] = psi_bnd[
2 * self.nx : 2 * self.nx + self.ny - 2
]
self.psi_boundary[-1, 1 : self.ny - 1] = psi_bnd[2 * self.nx + self.ny - 2 :]
self.rhs[0, :] = self.psi_boundary[0, :]
self.rhs[:, 0] = self.psi_boundary[:, 0]
self.rhs[-1, :] = self.psi_boundary[-1, :]
self.rhs[:, -1] = self.psi_boundary[:, -1]
[docs]
def F_function(self, plasma_psi, tokamak_psi, profiles):
"""Residual of the nonlinear Grad Shafranov equation written as a root problem
F(plasma_psi) \equiv [\delta* - J](plasma_psi)
The plasma_psi that solves the Grad Shafranov problem satisfies
F(plasma_psi) = [\delta* - J](plasma_psi) = 0
Parameters
----------
plasma_psi : np.array of size eq.nx*eq.ny
magnetic flux due to the plasma
tokamak_psi : np.array of size eq.nx*eq.ny
magnetic flux due to the tokamak alone, including all metal currents,
in both active coils and passive structures
profiles : freegs4e profile object
profile object describing target plasma properties,
used to calculate current density jtor
Returns
-------
residual : np.array of size eq.nx*eq.ny
residual of the GS equation
"""
self.freeboundary(plasma_psi, tokamak_psi, profiles)
residual = plasma_psi - (
self.linear_GS_solver(self.psi_boundary, self.rhs)
).reshape(-1)
return residual
[docs]
def port_critical(self, eq, profiles):
"""Transfers critical points and other useful info from profile to equilibrium object,
after GS solution.
Parameters
----------
eq : FreeGSNKE equilibrium object
Equilibrium on which to record values
profiles : FreeGSNKE profile object
Profiles object which has been used to calculate Jtor.
"""
eq.solved = True
eq.xpt = np.copy(profiles.xpt)
eq.opt = np.copy(profiles.opt)
eq.psi_axis = eq.opt[0, 2]
eq.psi_bndry = profiles.psi_bndry
eq.flag_limiter = profiles.flag_limiter
eq._current = np.sum(profiles.jtor) * self.dRdZ
eq._profiles = deepcopy(profiles)
try:
eq.tokamak_psi = self.tokamak_psi.reshape(self.nx, self.ny)
except:
pass
[docs]
def relative_norm_residual(self, res, psi):
"""Calculates a normalised relative residual, based on linalg.norm
Parameters
----------
res : ndarray
Residual
psi : ndarray
plasma_psi
Returns
-------
float
Relative normalised residual
"""
return np.linalg.norm(res) / np.linalg.norm(psi)
[docs]
def relative_del_residual(self, res, psi):
"""Calculates a normalised relative residual, based on the norm max(.) - min(.)
Parameters
----------
res : ndarray
Residual
psi : ndarray
plasma_psi
Returns
-------
float, float
Relative normalised residual, norm(plasma_psi)
"""
del_psi = np.amax(psi) - np.amin(psi)
del_res = np.amax(res) - np.amin(res)
return del_res / del_psi, del_psi
[docs]
def forward_solve(
self,
eq,
profiles,
target_relative_tolerance,
max_solving_iterations=100,
Picard_handover=0.11,
step_size=2.5,
scaling_with_n=-1.0,
target_relative_unexplained_residual=0.2,
max_n_directions=16,
max_rel_update_size=0.2,
clip=10,
# clip_quantiles=None,
force_up_down_symmetric=False,
verbose=False,
suppress=False,
):
"""The method that actually solves the forward static GS problem.
A forward problem is specified by the 2 FreeGSNKE objects eq and profiles.
The first specifies the metal currents (throught eq.tokamak)
and the second specifies the desired plasma properties
(i.e. plasma current and profile functions).
The plasma_psi which solves the given GS problem is assigned to
the input eq, and can be found at eq.plasma_psi.
Parameters
----------
eq : FreeGSNKE equilibrium object
Used to extract the assigned metal currents, which in turn are
used to calculate the according self.tokamak_psi
profiles : FreeGSNKE profile object
Specifies the target properties of the plasma.
These are used to calculate Jtor(psi)
target_relative_tolerance : float
NK iterations are interrupted when this criterion is
satisfied. Relative convergence for the residual F(plasma_psi)
max_solving_iterations : int
NK iterations are interrupted when this limit is surpassed
Picard_handover : float
Value of relative tolerance above which a Picard iteration
is performed instead of a NK call
step_size : float
l2 norm of proposed step, in units of the size of the residual R0
scaling_with_n : float
allows to further scale the proposed steps as a function of the
number of previous steps already attempted
(1 + self.n_it)**scaling_with_n
target_relative_unexplained_residual : float between 0 and 1
terminates internal iterations when the considered directions
can (linearly) explain such a fraction of the initial residual R0
max_n_directions : int
terminates iteration even though condition on
explained residual is not met
max_rel_update_size : float
maximum relative update, in norm, to plasma_psi. If larger than this,
the norm of the update is reduced
clip : float
maximum size of the update due to each explored direction, in units
of exploratory step used to calculate the finite difference derivative
force_up_down_symmetric : bool
If True, the solver is forced on up-down symmetric trial solutions
verbose : bool
flag to allow progress printouts
suppress : bool
flag to allow suppress all printouts
"""
if suppress:
verbose = False
picard_flag = 0
if force_up_down_symmetric:
trial_plasma_psi = 0.5 * (eq.plasma_psi + eq.plasma_psi[:, ::-1]).reshape(
-1
)
self.shape = np.shape(eq.plasma_psi)
else:
trial_plasma_psi = np.copy(eq.plasma_psi).reshape(-1)
# self.tokamak_psi = (eq.tokamak.calcPsiFromGreens(pgreen=eq._pgreen)).reshape(-1)
self.tokamak_psi = eq.tokamak.getPsitokamak(vgreen=eq._vgreen).reshape(-1)
log = []
log.append("-----")
log.append("Forward static solve starting...")
control_trial_psi = False
n_up = 0.0 + 4 * eq.solved
# this tries to cure cases where plasma_psi is not large enough in modulus
# causing no core mask to exist
while (control_trial_psi is False) and (n_up < 10):
try:
res0 = self.F_function(trial_plasma_psi, self.tokamak_psi, profiles)
control_trial_psi = True
log.append("Initial guess for plasma_psi successful, residual found.")
except:
trial_plasma_psi /= 0.8
n_up += 1
log.append("Initial guess for plasma_psi failed, trying to scale...")
# this is in case the above did not work
# then use standard initialization
# and grow peak until core mask exists
if control_trial_psi is False:
log.append("Default plasma_psi initialisation and adjustment invoked.")
eq.plasma_psi = trial_plasma_psi = eq.create_psi_plasma_default(
adaptive_centre=True
)
eq.adjust_psi_plasma()
trial_plasma_psi = np.copy(eq.plasma_psi).reshape(-1)
res0 = self.F_function(trial_plasma_psi, self.tokamak_psi, profiles)
control_trial_psi = True
self.jtor_at_start = profiles.jtor.copy()
norm_rel_change = self.relative_norm_residual(res0, trial_plasma_psi)
rel_change, del_psi = self.relative_del_residual(res0, trial_plasma_psi)
self.relative_change = 1.0 * rel_change
self.norm_rel_change = [norm_rel_change]
args = [self.tokamak_psi, profiles]
starting_direction = np.copy(res0)
log.append(f"Initial relative error = {rel_change:.2e}")
if verbose:
for x in log:
print(x)
self.initial_rel_residual = 1.0 * rel_change
log = []
log.append("-----")
iterations = 0
while (rel_change > target_relative_tolerance) * (
iterations < max_solving_iterations
):
if rel_change > Picard_handover:
log.append("Picard iteration: " + str(iterations))
# using Picard instead of NK
if picard_flag < min(max_solving_iterations - 1, 3):
# make picard update to the flux up-down symmetric
# this combats the instability of picard iterations
res0_2d = res0.reshape(self.nx, self.ny)
res0 = 0.5 * (res0_2d + res0_2d[:, ::-1]).reshape(-1)
picard_flag += 1
else:
# update = -1.0 * res0
picard_flag = 1
# # test Picard update
# nres0 = np.linalg.norm(res0)
# successful = False
# while successful == False:
# try:
# res1 = self.F_function(
# trial_plasma_psi - res0, self.tokamak_psi, profiles
# )
# nres1 = np.linalg.norm(res1)
# successful = True
# except:
# res0 *= .8
# standard Picard update
update = -1.0 * res0
# if successful:
# if nres1 > 1.5 * nres0:
# vals = [-1, 0]
# res_vals = [nres1, nres0]
# counter_picard = 0
# while (
# res_vals[-1] < res_vals[-2]
# and successful
# and counter_picard < 10
# ):
# try:
# new_res = self.F_function(
# trial_plasma_psi + (vals[-1] + 1) * res0,
# self.tokamak_psi,
# profiles,
# )
# vals.append(vals[-1] + 1)
# res_vals.append(np.linalg.norm(new_res))
# successful = True
# counter_picard += 1
# except:
# successful = False
# if counter_picard < 10:
# # find best quadratic polyfit
# poly_coeffs = np.polyfit(vals, res_vals, deg=2)
# coeff_picard = (
# 0.5
# * max(abs(poly_coeffs[1] / poly_coeffs[0]), 0.3)
# * np.sign(poly_coeffs[1] / poly_coeffs[0])
# )
# update = -res0 * coeff_picard
# if verbose:
# print(
# "custom Picard accepted, with coeff",
# coeff_picard,
# )
else:
# using NK
log.append("-----")
log.append("Newton-Krylov iteration: " + str(iterations))
picard_flag = False
self.nksolver.Arnoldi_iteration(
x0=trial_plasma_psi.copy(),
dx=starting_direction.copy(),
R0=res0.copy(),
F_function=self.F_function,
args=args,
step_size=step_size,
scaling_with_n=scaling_with_n,
target_relative_unexplained_residual=target_relative_unexplained_residual,
max_n_directions=max_n_directions,
clip=clip,
# clip_quantiles=clip_quantiles,
)
update = 1.0 * self.nksolver.dx
if force_up_down_symmetric:
log.append("Forcing up-dpwn symmetry of the plasma.")
update = update.reshape(self.shape)
update = 0.5 * (update + update[:, ::-1]).reshape(-1)
del_update = np.amax(update) - np.amin(update)
if del_update / del_psi > max_rel_update_size:
# Reduce the size of the update as found too large
update *= np.abs(max_rel_update_size * del_psi / del_update)
log.append("Update too large, resized.")
new_residual_flag = True
while new_residual_flag:
try:
# check update does not cause the disappearance of the Opoint
n_trial_plasma_psi = trial_plasma_psi + update
new_res0 = self.F_function(
n_trial_plasma_psi, self.tokamak_psi, profiles
)
new_norm_rel_change = self.relative_norm_residual(
new_res0, n_trial_plasma_psi
)
new_rel_change, new_del_psi = self.relative_del_residual(
new_res0, n_trial_plasma_psi
)
new_residual_flag = False
except:
log.append(
"Update resizing triggered due to failure to find a critical points."
)
update *= 0.75
if new_norm_rel_change < 1.2 * self.norm_rel_change[-1]:
# accept update
trial_plasma_psi = n_trial_plasma_psi.copy()
try:
residual_collinearity = np.sum(res0 * new_res0) / (
np.linalg.norm(res0) * np.linalg.norm(new_res0)
)
res0 = 1.0 * new_res0
if (residual_collinearity > 0.9) and (picard_flag is False):
log.append(
"New starting_direction used due to collinear residuals."
)
# Generate a random Krylov vector to continue the exploration
# This is arbitrary and can be improved
starting_direction = np.sin(
np.linspace(0, 2 * np.pi, self.nx)
* 1.5
* np.random.random()
)[:, np.newaxis]
starting_direction = (
starting_direction
* np.sin(
np.linspace(0, 2 * np.pi, self.ny)
* 1.5
* np.random.random()
)[np.newaxis, :]
)
starting_direction = starting_direction.reshape(-1)
starting_direction *= trial_plasma_psi
else:
starting_direction = np.copy(res0)
except:
starting_direction = np.copy(res0)
rel_change = 1.0 * new_rel_change
norm_rel_change = 1.0 * new_norm_rel_change
del_psi = 1.0 * new_del_psi
else:
reduce_by = self.relative_change / new_rel_change
log.append("Increase in residual, update reduction triggered.")
# log.append(reduce_by)
new_residual_flag = True
while new_residual_flag:
try:
n_trial_plasma_psi = trial_plasma_psi + update * reduce_by
res0 = self.F_function(
n_trial_plasma_psi, self.tokamak_psi, profiles
)
new_residual_flag = False
except:
log.append("reduction!")
reduce_by *= 0.75
starting_direction = np.copy(res0)
trial_plasma_psi = n_trial_plasma_psi.copy()
norm_rel_change = self.relative_norm_residual(res0, trial_plasma_psi)
rel_change, del_psi = self.relative_del_residual(res0, trial_plasma_psi)
self.relative_change = 1.0 * rel_change
self.norm_rel_change.append(norm_rel_change)
log.append(f"...relative error = {rel_change:.2e}")
log.append("-----")
if verbose:
for x in log:
print(x)
log = []
iterations += 1
# update eq with new solution
eq.plasma_psi = trial_plasma_psi.reshape(self.nx, self.ny).copy()
self.port_critical(eq=eq, profiles=profiles)
if not suppress:
if rel_change > target_relative_tolerance:
print(
f"Forward static solve DID NOT CONVERGE. Tolerance {rel_change:.2e} (vs. requested {target_relative_tolerance:.2e}) reached in {int(iterations)}/{int(max_solving_iterations)} iterations."
)
else:
print(
f"Forward static solve SUCCESS. Tolerance {rel_change:.2e} (vs. requested {target_relative_tolerance:.2e}) reached in {int(iterations)}/{int(max_solving_iterations)} iterations."
)
[docs]
def get_rel_delta_psit(self, delta_current, profiles, vgreen):
"""Calculates the relative change to the tokamak_psi in the core region
caused by the requested current change 'delta_current'.
Parameters
----------
delta_current : np.array
Vector of requested current changes.
profiles : freegsnke profile object
Used to source the core mask
vgreen : np.array
The green functions of the relevant coils.
For example eq._vgreen
"""
if hasattr(profiles, "diverted_core_mask"):
if profiles.diverted_core_mask is not None:
core_mask = np.copy(profiles.diverted_core_mask)
else:
core_mask = np.ones_like(self.eqR)
else:
core_mask = np.ones_like(self.eqR)
rel_delta_psit = np.linalg.norm(
np.sum(
delta_current[:, np.newaxis, np.newaxis]
* vgreen
* core_mask[np.newaxis],
axis=0,
)
)
rel_delta_psit /= np.linalg.norm(self.tokamak_psi) + 1e-6
return rel_delta_psit
[docs]
def get_rel_delta_psi(self, new_psi, previous_psi, profiles):
"""Calculates the relative change between new_psi and previous_psi
in the core region
Parameters
----------
new_psi : np.array
Flattened flux function
previous_psi : np.array
_descrFlattened flux functioniption_
profiles : freegsnke profile object
Used to source the core mask
"""
if hasattr(profiles, "diverted_core_mask"):
if profiles.diverted_core_mask is not None:
core_mask = np.copy(profiles.diverted_core_mask)
else:
core_mask = np.ones_like(self.eqR)
else:
core_mask = np.ones_like(self.eqR)
core_mask = core_mask.reshape(-1)
rel_delta_psit = np.linalg.norm((new_psi - previous_psi) * core_mask)
rel_delta_psit /= np.linalg.norm((new_psi + previous_psi) * core_mask)
return rel_delta_psit
[docs]
def optimize_currents(
self,
eq,
profiles,
constrain,
target_relative_tolerance,
relative_psit_size=1e-3,
l2_reg=1e-12,
verbose=False,
):
"""Calculates requested current changes for the coils available to control
using the actual Jacobian rather than assuming the Jacobian is given by the
green functions.
Parameters
----------
eq : freegsnke equilibrium object
Used to source coil currents and plasma_psi
profiles : freegsnke equilibrium object
Provides the plasma properties
constrain : freegnske inverse_optimizer object
Provides information on the coils available for control
target_relative_tolerance : float
Target tolerance applied to GS when building the Jacobian
relative_psit_size : float, optional
Used to size the finite difference steps that define the Jacobian, by default 1e-3
l2_reg : float, optional
The Tichonov regularization factor applied to the least sq problem, by default 1e-12
verbose : bool
Print output
"""
self.dbdI = np.zeros((np.size(constrain.b), constrain.n_control_coils))
self.dummy_current = np.zeros(constrain.n_control_coils)
full_current_vec = np.copy(eq.tokamak.current_vec)
self.forward_solve(
eq=eq,
profiles=profiles,
target_relative_tolerance=target_relative_tolerance,
suppress=True,
)
delta_current, loss = constrain.optimize_currents(
full_currents_vec=full_current_vec,
trial_plasma_psi=eq.plasma_psi,
l2_reg=1e-12,
)
b0 = np.copy(constrain.b)
rel_delta_psit = self.get_rel_delta_psit(
delta_current, profiles, eq._vgreen[constrain.control_mask]
)
adj_factor = min(1, relative_psit_size / rel_delta_psit)
# print(delta_current, rel_delta_psit, adj_factor)
delta_current *= adj_factor
# delta_current_ = np.where(delta_current > 0.1, delta_current, 0.1*np.ones_like(delta_current))
# print(delta_current)
for i in range(constrain.n_control_coils):
if verbose:
print(
f" - calculating derivatives for coil {i+1}/{constrain.n_control_coils}"
)
currents = np.copy(self.dummy_current)
currents[i] = 1.0 * delta_current[i]
currents = full_current_vec + constrain.rebuild_full_current_vec(currents)
self.eq2 = deepcopy(eq)
self.eq2.tokamak.set_all_coil_currents(currents)
self.forward_solve(
eq=self.eq2,
profiles=profiles,
target_relative_tolerance=target_relative_tolerance,
suppress=True,
)
constrain.optimize_currents(
full_currents_vec=currents,
trial_plasma_psi=self.eq2.plasma_psi,
l2_reg=1e-12,
)
self.dbdI[:, i] = (constrain.b - b0) / delta_current[i]
if type(l2_reg) == float:
reg_matrix = l2_reg * np.eye(constrain.n_control_coils)
else:
reg_matrix = np.diag(l2_reg)
mat = np.linalg.inv(np.matmul(self.dbdI.T, self.dbdI) + reg_matrix)
Newton_delta_current = np.dot(mat, np.dot(self.dbdI.T, -b0))
loss = np.linalg.norm(b0 + np.dot(self.dbdI, Newton_delta_current))
return Newton_delta_current, loss
[docs]
def inverse_solve(
self,
eq,
profiles,
constrain,
target_relative_tolerance,
target_relative_psit_update=1e-3,
max_solving_iterations=100,
max_iter_per_update=5,
Picard_handover=0.15,
step_size=2.5,
scaling_with_n=-1.0,
target_relative_unexplained_residual=0.3,
max_n_directions=16,
clip=10,
# clip_quantiles=None,
max_rel_update_size=0.15,
threshold_val=0.18,
l2_reg=1e-9,
forward_tolerance_increase=100,
# forward_tolerance_increase_factor=1.5,
max_rel_psit=0.02,
damping_factor=0.995,
use_full_Jacobian=True,
full_jacobian_handover=[1e-5, 7e-3],
l2_reg_fj=1e-8,
force_up_down_symmetric=False,
verbose=False,
suppress=False,
):
"""Inverse solver.
Both coil currents and plasma flux are sought. Needs a set of desired magnetic constraints.
An inverse problem is specified by the 2 FreeGSNKE objects, eq and profiles,
plus a freegsnke "constrain" (or Inverse_optimizer) object.
The first specifies the metal currents (throught eq.tokamak)
The second specifies the desired plasma properties
(i.e. plasma current and profile functions).
The constrain object collects the desired magnetic constraints.
The plasma_psi which solves the given GS problem is assigned to
the input eq, and can be found at eq.plasma_psi.
The coil currents with satisfy the magnetic constraints are
assigned to eq.tokamak
Parameters
----------
eq : FreeGSNKE equilibrium object
Used to extract the assigned metal currents, which in turn are
used to calculate the according self.tokamak_psi
profiles : FreeGSNKE profile object
Specifies the target properties of the plasma.
These are used to calculate Jtor(psi)
constrain : freegsnke inverse_optimizer object
Specifies the coils available for control and the desired magnetic constraints
target_relative_tolerance : float
The desired tolerance for the plasma flux.
At fixed coil currents, this is the tolerance imposed to the GS problem.
This has to be satisfied for the inverse problem to be considered solved.
target_relative_psit_update : float
The relative update to psi_tokamak caused by the update in the control currents
has to be lower than this target value for the inverse problem to be considered
successfully solved.
max_solving_iterations : int
Maximum number of solving iterations. The solver is interuupted when this is met.
max_iter_per_update : int
Maximum number of interations allowed to the forward solver in each cycle of the inverse solver.
Picard_handover : float
Value of relative tolerance above which a Picard iteration
is performed instead of a NK call in the forward solve
step_size : float
l2 norm of proposed step, in units of the size of the residual R0
scaling_with_n : float
allows to further scale the proposed steps as a function of the
number of previous steps already attempted
(1 + self.n_it)**scaling_with_n
target_relative_unexplained_residual : float between 0 and 1
terminates internal iterations when the considered directions
can (linearly) explain such a fraction of the initial residual R0
max_n_directions : int
terminates iteration even though condition on
explained residual is not met
max_rel_update_size : float
maximum relative update, in norm, to plasma_psi. If larger than this,
the norm of the update is reduced
clip : float
maximum size of the update due to each explored direction, in units
of exploratory step used to calculate the finite difference derivative
l2_reg : either float or 1d np.array with len=self.n_control_coils
The regularization factor applied when green functions are used as Jacobians
forward_tolerance_increase : float
after coil currents are updated, the interleaved forward problems
are requested to converge to a tolerance that is tighter by a factor
forward_tolerance_increase with respect to the change in flux caused
by the current updates over the plasma core
forward_tolerance_increase_factor : float
iterations that do not result in improvement trigger an increase in the tolerance
applied to the forward problems. The increase corresponds to a factor forward_tolerance_increase_factor
max_rel_psit : float
The maximum relative change that the requested updates in the control currents are allowed to cause.
damping_factor : float
This applies a damping of damping_factor**number_of_iterations to max_rel_psit,
to encourage convergence
full_jacobian_handover : float
When the forward problems achieve this tolerance level,
self.optimize_currents is called instead of constrain.optimize_currents.
This means that the actual Jacobians are used rather than the green functions.
l2_reg_fj : float
The regularization factor applied when the full Jacobians are used.
verbose : bool
flag to allow progress printouts
suppress : bool
flag to allow suppress all printouts
"""
if suppress:
verbose = False
if verbose:
print("-----")
print("Inverse static solve starting...")
iterations = 0
damping = 1
self.rel_psit_updates = [max_rel_psit]
previous_rel_delta_psit = 1
self.constrain_loss = []
check_core_mask = False
# If not calling the solver from self.solve
# then you need to ensure that the vectorised currents are in place in tokamak object!
# Make sure any currents that are not controlled are assigned using eq.tokamak.set_coil_current()
# Otherwise the following tokamak_psi will not be the correct one!
full_currents_vec = np.copy(eq.tokamak.current_vec)
self.tokamak_psi = eq.tokamak.getPsitokamak(vgreen=eq._vgreen)
# prepare all green functions needed by the current optimizations
constrain.prepare_for_solve(eq)
check_equilibrium = False
try:
GS_residual = self.F_function(
tokamak_psi=self.tokamak_psi.reshape(-1),
plasma_psi=eq.plasma_psi.reshape(-1),
profiles=profiles,
)
if verbose:
print("Initial guess for plasma_psi successful, residual found.")
rel_change_full, del_psi = self.relative_del_residual(
GS_residual, eq.plasma_psi
)
if rel_change_full < threshold_val:
check_equilibrium = True
if profiles.diverted_core_mask is not None:
check_core_mask = True
except:
pass
if verbose:
print(f"Initial relative error = {rel_change_full:.2e}")
print("-----")
while (
(rel_change_full > target_relative_tolerance)
+ (previous_rel_delta_psit > target_relative_psit_update)
) * (iterations < max_solving_iterations):
if verbose:
print("Iteration: " + str(iterations))
if check_equilibrium:
# this_max_rel_psit = min(max_rel_psit, np.mean(self.rel_psit_updates[-6:]))
this_max_rel_psit = np.mean(self.rel_psit_updates[-6:])
this_max_rel_update_size = 1.0 * max_rel_update_size
if type(l2_reg) == float:
this_l2_reg = 1.0 * l2_reg
else:
this_l2_reg = np.array(l2_reg)
if (previous_rel_delta_psit < target_relative_psit_update) * (
rel_change_full < 50 * target_relative_tolerance
):
# use more iterations if 'close to solution'
this_max_iter_per_update = 50
else:
this_max_iter_per_update = 1.0 * max_iter_per_update
else:
this_max_rel_psit = False
this_max_rel_update_size = max(max_rel_update_size, 0.3)
this_max_iter_per_update = 1
if type(l2_reg) == float:
this_l2_reg = 1e-4 * l2_reg
else:
this_l2_reg = 1e-4 * np.array(l2_reg)
if (
use_full_Jacobian
* (rel_change_full < full_jacobian_handover[0])
* (previous_rel_delta_psit < full_jacobian_handover[1])
):
if verbose:
print(
"Using full Jacobian (of constraints wrt coil currents) to optimsise currents."
)
# use complete Jacobian: psi_plasma changes with the coil currents
delta_current, loss = self.optimize_currents(
eq=eq,
profiles=profiles,
constrain=constrain,
target_relative_tolerance=target_relative_tolerance,
relative_psit_size=this_max_rel_psit,
l2_reg=l2_reg_fj,
verbose=verbose,
)
else:
if verbose:
print(
"Using simplified Green's Jacobian (of constraints wrt coil currents) to optimise the currents."
)
# use Greens as Jacobian: i.e. psi_plasma is assumed fixed
delta_current, loss = constrain.optimize_currents(
full_currents_vec=full_currents_vec,
trial_plasma_psi=eq.plasma_psi,
l2_reg=this_l2_reg,
)
self.constrain_loss.append(loss)
rel_delta_psit = self.get_rel_delta_psit(
delta_current, profiles, eq._vgreen[constrain.control_mask]
)
# print("requested rel_delta_psit", rel_delta_psit)
if this_max_rel_psit:
# resize update to the control currents so to limit the relative change of the tokamak flux to this_max_rel_psit
if constrain.curr_loss < 1:
damping *= damping_factor
adj_factor = damping * min(1, this_max_rel_psit / rel_delta_psit)
# apply the resizing
else:
adj_factor = 1.0
delta_current *= adj_factor
previous_rel_delta_psit = rel_delta_psit * adj_factor
if check_core_mask:
# make sure that the update of the control currents does not cause a loss of the Opoint or of the Xpoints
delta_tokamak_psi = np.sum(
delta_current[:, np.newaxis, np.newaxis]
* eq._vgreen[constrain.control_mask],
axis=0,
).reshape(-1)
resize = True
while resize:
try:
GS_residual = self.F_function(
tokamak_psi=self.tokamak_psi.reshape(-1)
+ delta_tokamak_psi,
plasma_psi=eq.plasma_psi.reshape(-1),
profiles=profiles,
)
if len(profiles.xpt):
# The update is approved:
resize = False
except:
pass
if resize:
if verbose:
print("Resizing of the control current update triggered!")
delta_current *= 0.75
delta_tokamak_psi *= 0.75
previous_rel_delta_psit *= 0.75
self.rel_psit_updates.append(previous_rel_delta_psit)
# apply the update to the control currents
full_currents_vec += constrain.rebuild_full_current_vec(delta_current)
eq.tokamak.set_all_coil_currents(full_currents_vec)
if verbose:
print(
f"Change in coil currents (being controlled): {[f'{val:.2e}' for val in delta_current]}"
)
print(f"Constraint losses = {loss:.2e}")
print(
f"Relative update of tokamak psi (in plasma core): {previous_rel_delta_psit:.2e}"
)
# if loss > self.constrain_loss[-1]:
# forward_tolerance_increase *= forward_tolerance_increase_factor
# set tolerance for the upcoming forward solve
if previous_rel_delta_psit < target_relative_psit_update:
tolerance = 1.0 * target_relative_tolerance
this_max_rel_update_size = 50
else:
tolerance = max(
min(previous_rel_delta_psit / forward_tolerance_increase, 1e-3),
target_relative_tolerance,
)
# forward solve to update the plasma_psi based on the new currents
if verbose:
print(f"Handing off to forward solve (with updated currents).")
self.forward_solve(
eq,
profiles,
target_relative_tolerance=tolerance,
max_solving_iterations=this_max_iter_per_update,
Picard_handover=Picard_handover,
step_size=step_size,
scaling_with_n=scaling_with_n,
target_relative_unexplained_residual=target_relative_unexplained_residual,
max_n_directions=max_n_directions,
clip=clip,
verbose=False,
max_rel_update_size=this_max_rel_update_size,
force_up_down_symmetric=force_up_down_symmetric,
suppress=True,
)
rel_change_full = 1.0 * self.relative_change
iterations += 1
if verbose:
print(f"Relative error = {rel_change_full:.2e}")
print("-----")
if rel_change_full < threshold_val:
check_equilibrium = True
else:
check_equilibrium = False
if not suppress:
if rel_change_full > target_relative_tolerance:
print(
f"Inverse static solve DID NOT CONVERGE. Tolerance {rel_change_full:.2e} (vs. requested {target_relative_tolerance}) reached in {int(iterations)}/{int(max_solving_iterations)} iterations."
)
else:
print(
f"Inverse static solve SUCCESS. Tolerance {rel_change_full:.2e} (vs. requested {target_relative_tolerance}) reached in {int(iterations)}/{int(max_solving_iterations)} iterations."
)
[docs]
def solve(
self,
eq,
profiles,
constrain=None,
target_relative_tolerance=1e-5,
target_relative_psit_update=1e-3,
max_solving_iterations=100,
max_iter_per_update=5,
Picard_handover=0.1,
step_size=2.5,
scaling_with_n=-1.0,
target_relative_unexplained_residual=0.3,
max_n_directions=16,
clip=10,
# clip_quantiles=None,
max_rel_update_size=0.15,
l2_reg=1e-9,
forward_tolerance_increase=100,
# forward_tolerance_increase_factor=1.5,
max_rel_psit=0.01,
damping_factor=0.98,
use_full_Jacobian=True,
full_jacobian_handover=[1e-5, 7e-3],
l2_reg_fj=1e-8,
force_up_down_symmetric=False,
verbose=False,
suppress=False,
):
"""The method to solve the GS problems, both forward and inverse.
- an inverse solve is specified by the 'constrain' input,
which includes the desired constraints on the configuration of magnetic flux.
Please see inverse_solve for details.
- a forward solve has constrain=None. Please see forward_solve for details.
Parameters
----------
eq : FreeGSNKE equilibrium object
Used to extract the assigned metal currents, which in turn are
used to calculate the according self.tokamak_psi
profiles : FreeGSNKE profile object
Specifies the target properties of the plasma.
These are used to calculate Jtor(psi)
constrain : freegsnke inverse_optimizer object
Specifies the coils available for control and the desired magnetic constraints
target_relative_tolerance : float
The desired tolerance for the plasma flux.
At fixed coil currents, this is the tolerance imposed to the GS problem.
This has to be satisfied for the inverse problem to be considered solved.
target_relative_psit_update : float
The relative update to psi_tokamak caused by the update in the control currents
has to be lower than this target value for the inverse problem to be considered
successfully solved.
max_solving_iterations : int
Maximum number of solving iterations. The solver is interuupted when this is met.
max_iter_per_update : int
Maximum number of interations allowed to the forward solver in each cycle of the inverse solver.
Picard_handover : float
Value of relative tolerance above which a Picard iteration
is performed instead of a NK call in the forward solve
step_size : float
l2 norm of proposed step, in units of the size of the residual R0
scaling_with_n : float
allows to further scale the proposed steps as a function of the
number of previous steps already attempted
(1 + self.n_it)**scaling_with_n
target_relative_unexplained_residual : float between 0 and 1
terminates internal iterations when the considered directions
can (linearly) explain such a fraction of the initial residual R0
max_n_directions : int
terminates iteration even though condition on
explained residual is not met
max_rel_update_size : float
maximum relative update, in norm, to plasma_psi. If larger than this,
the norm of the update is reduced
clip : float
maximum size of the update due to each explored direction, in units
of exploratory step used to calculate the finite difference derivative
l2_reg : either float or 1d np.array with len=self.n_control_coils
The regularization factor applied when green functions are used as Jacobians
forward_tolerance_increase : float
after coil currents are updated, the interleaved forward problems
are requested to converge to a tolerance that is tighter by a factor
forward_tolerance_increase with respect to the change in flux caused
by the current updates over the plasma core
forward_tolerance_increase_factor : float
iterations that do not result in improvement trigger an increase in the tolerance
applied to the forward problems. The increase corresponds to a factor forward_tolerance_increase_factor
max_rel_psit : float
The maximum relative change that the requested updates in the control currents are allowed to cause.
damping_factor : float
This applies a damping of damping_factor**number_of_iterations to max_rel_psit,
to encourage convergence
full_jacobian_handover : float
When the forward problems achieve this tolerance level,
self.optimize_currents is called instead of constrain.optimize_currents.
This means that the actual Jacobians are used rather than the green functions.
l2_reg_fj : float
The regularization factor applied when the full Jacobians are used.
verbose : bool
flag to allow progress printouts
suppress : bool
flag to allow suppress all printouts
"""
# ensure vectorised currents are in place in tokamak object
eq.tokamak.getCurrentsVec()
# forward solve
eq._separatrix_data_flag = False
if constrain is None:
self.forward_solve(
eq=eq,
profiles=profiles,
target_relative_tolerance=target_relative_tolerance,
max_solving_iterations=max_solving_iterations,
Picard_handover=Picard_handover,
step_size=step_size,
scaling_with_n=scaling_with_n,
target_relative_unexplained_residual=target_relative_unexplained_residual,
max_n_directions=max_n_directions,
clip=clip,
verbose=verbose,
max_rel_update_size=max_rel_update_size,
force_up_down_symmetric=force_up_down_symmetric,
suppress=suppress,
)
else:
self.inverse_solve(
eq=eq,
profiles=profiles,
constrain=constrain,
target_relative_tolerance=target_relative_tolerance,
target_relative_psit_update=target_relative_psit_update,
max_solving_iterations=max_solving_iterations,
max_iter_per_update=max_iter_per_update,
Picard_handover=Picard_handover,
step_size=step_size,
scaling_with_n=scaling_with_n,
target_relative_unexplained_residual=target_relative_unexplained_residual,
max_n_directions=max_n_directions,
clip=clip,
max_rel_update_size=max_rel_update_size,
l2_reg=l2_reg,
forward_tolerance_increase=forward_tolerance_increase,
# forward_tolerance_increase_factor=forward_tolerance_increase_factor,
max_rel_psit=max_rel_psit,
damping_factor=damping_factor,
full_jacobian_handover=full_jacobian_handover,
use_full_Jacobian=use_full_Jacobian,
l2_reg_fj=l2_reg_fj,
force_up_down_symmetric=force_up_down_symmetric,
verbose=verbose,
suppress=suppress,
)
# def old_inverse_solve(
# self,
# eq,
# profiles,
# target_relative_tolerance,
# constrain,
# verbose=False,
# max_solving_iterations=20,
# max_iter_per_update=5,
# Picard_handover=0.1,
# initial_Picard=True,
# step_size=2.5,
# scaling_with_n=-1.0,
# target_relative_unexplained_residual=0.3,
# max_n_directions=16,
# clip=10,
# clip_quantiles=None,
# max_rel_update_size=0.2,
# forward_tolerance_increase=5,
# ):
# """Inverse solver using the NK implementation.
# An inverse problem is specified by the 2 FreeGSNKE objects, eq and profiles,
# plus a constrain freeGS4e object.
# The first specifies the metal currents (throught eq.tokamak)
# The second specifies the desired plasma properties
# (i.e. plasma current and profile functions).
# The constrain object collects the desired magnetic constraints.
# The plasma_psi which solves the given GS problem is assigned to
# the input eq, and can be found at eq.plasma_psi.
# The coil currents with satisfy the magnetic constraints are
# assigned to eq.tokamak
# Parameters
# ----------
# eq : FreeGSNKE equilibrium object
# Used to extract the assigned metal currents, which in turn are
# used to calculate the according self.tokamak_psi
# profiles : FreeGSNKE profile object
# Specifies the target properties of the plasma.
# These are used to calculate Jtor(psi)
# target_relative_tolerance : float
# NK iterations are interrupted when this criterion is
# satisfied. Relative convergence for the residual F(plasma_psi)
# constrain : freegs4e constrain object
# verbose : bool
# flag to allow progress printouts
# max_solving_iterations : int
# NK iterations are interrupted when this limit is surpassed
# Picard_handover : float
# Value of relative tolerance above which a Picard iteration
# is performed instead of a NK call
# step_size : float
# l2 norm of proposed step, in units of the size of the residual R0
# scaling_with_n : float
# allows to further scale the proposed steps as a function of the
# number of previous steps already attempted
# (1 + self.n_it)**scaling_with_n
# target_relative_explained_residual : float between 0 and 1
# terminates internal iterations when the considered directions
# can (linearly) explain such a fraction of the initial residual R0
# max_n_directions : int
# terminates iteration even though condition on
# explained residual is not met
# max_rel_update_size : float
# maximum relative update, in norm, to plasma_psi. If larger than this,
# the norm of the update is reduced
# clip : float
# maximum size of the update due to each explored direction, in units
# of exploratory step used to calculate the finite difference derivative
# forward_tolerance_increase : float
# after coil currents are updated, the interleaved forward problems
# are requested to converge to a tolerance that is tighter by a factor
# forward_tolerance_increase with respect to the change in flux caused
# by the current updates over the plasma core
# """
# log = []
# # self.control_coils = list(eq.tokamak.getCurrents().keys())
# # control_mask = np.arange(len(self.control_coils))[
# # np.array([eq.tokamak[coil].control for coil in self.control_coils])
# # ]
# # self.control_coils = [self.control_coils[i] for i in control_mask]
# # self.len_control_coils = len(self.control_coils)
# if initial_Picard:
# # use freegs4e Picard solver for initial steps to a shallow tolerance
# freegs4e.solve(
# eq,
# profiles,
# constrain,
# rtol=4e-2,
# show=False,
# blend=0.0,
# )
# iterations = 0
# rel_change_full = 1
# while (rel_change_full > target_relative_tolerance) * (
# iterations < max_solving_iterations
# ):
# log.append("-----")
# log.append("Newton-Krylov iteration: " + str(iterations))
# norm_delta = self.update_currents(constrain, eq, profiles)
# self.forward_solve(
# eq,
# profiles,
# target_relative_tolerance=norm_delta / forward_tolerance_increase,
# max_solving_iterations=max_iter_per_update,
# Picard_handover=Picard_handover,
# step_size=step_size,
# scaling_with_n=-scaling_with_n,
# target_relative_unexplained_residual=target_relative_unexplained_residual,
# max_n_directions=max_n_directions,
# clip=clip,
# clip_quantiles=clip_quantiles,
# verbose=False,
# max_rel_update_size=max_rel_update_size,
# )
# rel_change_full = 1.0 * self.relative_change
# iterations += 1
# log.append("...relative error = " + str(rel_change_full))
# if verbose:
# for x in log:
# print(x)
# log = []
# if iterations >= max_solving_iterations:
# warnings.warn(
# f"Inverse solve failed to converge to requested relative tolerance of "
# + f"{target_relative_tolerance} with less than {max_solving_iterations} "
# + f"iterations. Last relative psi change: {rel_change_full}. "
# + f"Last current change caused a relative update of tokamak_psi in the core of: {norm_delta}."
# )