Source code for freegsnke.virtual_circuits

"""
Defines class that represents the virtual circuits. 

Copyright 2025 UKAEA, UKRI-STFC, and The Authors, as per the COPYRIGHT and README files.

This file is part of FreeGSNKE.

FreeGSNKE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.

FreeGSNKE is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
  
You should have received a copy of the GNU Lesser General Public License
along with FreeGSNKE.  If not, see <http://www.gnu.org/licenses/>. 
"""

from datetime import datetime

import numpy as np


[docs] class VirtualCircuit: """ The class for storing/recording virtual circuits that have been built using the VirtualCircuitHandling class. """
[docs] def __init__( self, name, eq, profiles, shape_matrix, VCs_matrix, target_names, coils, target_calculator, ): """ Store the key quantities from the VirtualCircuitHandling calculations. Parameters ---------- name : str Name to call the VCs (e.g. super-X VCs). eq : object The equilibrium object used to build the VCs. profiles : object The profiles object used to build the VCs. shape_matrix : np.array The array storing the Jacobians between the targets and coils given in 'targets' and 'coils'. VCs_matrix : np.array The array storing the VCs between the targets and coils given in 'targets' and 'coils'. target_names : list A list of the target names, e.g [Rin, Rout, Rx, Zx, ...] (must be same length as array from target_calculator). coils : list The list of coils used to calculate the shape_matrix and VCs_matrix. target_calculator : function Function returning an array of the shape targets (VC will be calculated for ALL of these targets). """ self.name = name self.eq = eq self.profiles = profiles self.shape_matrix = shape_matrix self.VCs_matrix = VCs_matrix self.target_names = target_names self.coils = coils self.target_calculator = target_calculator
[docs] class VirtualCircuitHandling: """ The virtual circuits handling class. """
[docs] def __init__( self, ): """ Initialises the virtual circuits. Parameters ---------- """ # name to store the VC under self.default_VC_name = f"VC_{datetime.today().strftime('%Y%m%d')}"
[docs] def define_solver(self, solver, target_relative_tolerance=1e-7): """ Sets the solver in the VC class. Parameters ---------- solver : object The static Grad-Shafranov solver object. target_relative_tolerance : float Target relative tolerance to be met by the solver. Returns ------- None Modifies the class object in place. """ self.solver = solver self.target_relative_tolerance = target_relative_tolerance
[docs] def build_current_vec(self, eq, coils): """ For the given equilibrium, this function stores the coil currents (for those listed in 'coils') in the class object. Parameters ---------- eq : object The equilibrium object. coils : list List of strings containing the names of the coil currents to be stored. Returns ------- None Modifies the class object in place. """ # empty array for the currents self.currents_vec = np.zeros(len(coils)) # set the currents for i, coil in enumerate(coils): self.currents_vec[i] = eq.tokamak[coil].current
[docs] def assign_currents(self, currents_vec, coils, eq): """ For the given equilibrium, this function assigns the coil currents (for those listed in 'coils') in the class object. Parameters ---------- currents_vec : np.array Vector of coil currents to be assigned to the eq object using the coil names in 'coils. eq : object The equilibrium object. coils : list List of strings containing the names of the coil currents to be assigned. Returns ------- None Modifies the class object in place. """ # directly assign the currents for i, coil in enumerate(coils): eq.tokamak.set_coil_current(coil, currents_vec[i])
[docs] def assign_currents_solve_GS(self, currents_vec, coils, target_relative_tolerance): """ Assigns the coil currents in 'currents_vec' to a private equilibrium object and then solve using the static GS solver. Parameters ---------- currents_vec : np.array Input current values to be assigned. Format as in self.assign_currents. coils : list List of strings containing the names of the coil currents to be assigned. target_relative_tolerance : float Target relative tolerance to be met by the solver. Returns ------- None Modifies the class (and other private) object(s) in place. """ # assign currents self.assign_currents(currents_vec, coils, eq=self._eq2) # solve for equilibrium try: self.solver.forward_solve( self._eq2, self._profiles2, target_relative_tolerance=target_relative_tolerance, # suppress=True, ) except AttributeError: raise AttributeError("Solver not defined. Call define_solver() first.")
[docs] def prepare_build_dIydI_j( self, j, coils, target_dIy, starting_dI, min_curr=1e-4, max_curr=300 ): """ Prepares to compute the term d(Iy)/dI_j of the Jacobian by inferring the value of delta(I_j) corresponding to a change delta(I_y) with norm(delta(I_y)) = target_dIy. Here: - Iy is the flattened vector of plasma currents (on the computational grid). - I_j is the current in the jth coil. Parameters ---------- j : int Index identifying the current to be varied. Indexes as in self.currents_vec. coils : list List of strings containing the names of the coil currents to be assigned. target_dIy : float Target value for the norm of delta(I_y), from which the finite difference derivative is calculated. starting_dI : float Initial value to be used as delta(I_j) to infer the slope of norm(delta(I_y))/delta(I_j). min_curr : float, optional, by default 1e-4 If inferred current value is below min_curr, clip to min_curr. max_curr : int, optional, by default 300 If inferred current value is above max_curr, clip to max_curr. Returns ------- None Modifies the class (and other private) object(s) in place. """ # copy of currents currents = np.copy(self.currents_vec) # perturb current j currents[j] += starting_dI # assign current to the coil and solve static GS problem self.assign_currents_solve_GS(currents, coils, self.target_relative_tolerance) # difference between plasma current vectors (before and after the solve) dIy_0 = self._eq2.limiter_handler.Iy_from_jtor(self._profiles2.jtor) - self.Iy # relative norm of plasma current change norm_dIy_0 = np.linalg.norm(dIy_0) if norm_dIy_0 < 1e-10: raise ZeroDivisionError( "Norm of change in jtor is near-zero for this Jacobian, please increase 'starting_dI' parameter." ) else: rel_ndIy_0 = norm_dIy_0 / self._nIy # scale the starting_dI to match the target final_dI = starting_dI * target_dIy / rel_ndIy_0 # clip small/large currents final_dI = np.clip(final_dI, min_curr, max_curr) # store self.final_dI_record[j] = final_dI
[docs] def build_dIydI_j( self, j, coils, verbose=False, ): """ Computes the term d(Iy)/dI_j of the Jacobian as a finite difference derivative, using the value of delta(I_j) inferred earlier by self.prepare_build_dIydI_j. Here: - Iy is the flattened vector of plasma currents (on the computational grid). - I_j is the current in the jth coil. Parameters ---------- j : int Index identifying the current to be varied. Indexes as in self.currents_vec. coils : list List of strings containing the names of the coil currents to be assigned. verbose: bool Display output (or not). Returns ------- None VC object modifed in place. """ # print some output if verbose: print(f"Coil {coils[j]}") # store dI final_dI = 1.0 * self.final_dI_record[j] # copy of currents currents = np.copy(self.currents_vec) # perturb current currents[j] += final_dI # assign current to the coil and solve static GS problem self.assign_currents_solve_GS(currents, coils, self.target_relative_tolerance) # calculate finite difference of targets wrt to the coil current self._target_vec_1 = self.target_calculator(self._eq2) dtargets = self._target_vec_1 - self._targets_vec return dtargets / final_dI
[docs] def calculate_VC( self, eq, profiles, coils, target_names, target_calculator, target_dIy=1e-3, starting_dI=None, min_starting_dI=50, verbose=False, name=None, ): """ Calculate the "virtual circuits" matrix: V = (S^T S)^(-1) S^T, which is the Moore-Penrose pseudo-inverse of the shape (Jacobian) matrix S: S_ij = dT_i / dI_j. This represents the sensitivity of target parameters T_i to changes in coil currents I_j. Parameters ---------- eq : object The equilibrium object. profiles : object The profiles object. coils : list List of strings containing the names of the coil currents to be assigned. target_names : list A list of the target names, e.g [Rin, Rout, Rx, Zx, ...] (must be same length as array from target_calculator). target_calculator : function Function returning an array of the shape targets (VC will be calculated for ALL of these targets). target_dIy : float Target value for the norm of delta(I_y), from which the finite difference derivative is calculated. starting_dI : array Initial current perturbations [Amps] to be used as delta(I_j) to infer the slope of norm(delta(I_y))/delta(I_j). min_starting_dI : float Minimum starting_dI value to be used as delta(I_j): to infer the slope of norm(delta(I_y))/delta(I_j). verbose: bool Display output (or not). name: str Name to store the VC under (in the 'VirtualCircuit' class). Returns ------- None Modifies the class (and other private) object(s) in place. """ # store original currents self.build_current_vec(eq, coils) # store function to calculate targets from equilibrium if target_calculator is None: raise ValueError("You need to input a 'target_calculator' function!") self.target_calculator = target_calculator # calculate the targets from the equilibrium self._targets_vec = self.target_calculator(eq) if target_names is None: raise ValueError("You need to input a list of 'target_names'!") elif len(target_names) != len(self._targets_vec): raise ValueError( "Number of 'target_names' does not match length of array from 'target_calculator' function!" ) self.target_names = target_names # solve static GS problem (it's already solved?) try: self.solver.forward_solve( eq=eq, profiles=profiles, target_relative_tolerance=self.target_relative_tolerance, suppress=True, ) except AttributeError: raise AttributeError("Solver not defined. Call define_solver() first.") # store the flattened plasma current vector (and its norm) self.Iy = eq.limiter_handler.Iy_from_jtor(profiles.jtor).copy() self._nIy = np.linalg.norm(self.Iy) # define starting_dI using currents if not given if starting_dI is None: starting_dI = np.abs(self.currents_vec.copy()) * target_dIy starting_dI = np.where( starting_dI > min_starting_dI, starting_dI, min_starting_dI ) if verbose: print("--- Stage one ---") print( f"Re-sizing each initial coil current shift so that it produces a {np.round(target_dIy*100,2)}% change in plasma current density from the input equilibrium." ) # storage matrices shape_matrix = np.zeros((len(self._targets_vec), len(coils))) self.final_dI_record = np.zeros(len(coils)) # make copies of the newly solved equilibrium and profile objects # these are used for all GS solves below self._eq2 = eq.create_auxiliary_equilibrium() self._profiles2 = profiles.copy() # for each coil, prepare by inferring delta(I_j) corresponding to a change delta(I_y) # with norm(delta(I_y)) = target_dIy for j in np.arange(len(coils)): self.prepare_build_dIydI_j(j, coils, target_dIy, starting_dI[j]) if verbose: print( f"Coil {coils[j]} (original current shift = {np.round(starting_dI[j],2)} [A] --> scaled current shift {np.round(self.final_dI_record[j],2)} [A])." ) if verbose: print("--- Stage two ---") print( "Building the shape matrix (Jacobian) of the shape parameter changes wrt scaled current shifts for each coil:" ) # for each coil, build the Jacobian using the value of delta(I_j) inferred earlier # by self.prepare_build_dIydI_j. for j in np.arange(len(coils)): # each shape matrix row is derivative of targets wrt the final coil current change shape_matrix[:, j] = self.build_dIydI_j(j, coils, verbose) # store the data in its own (new) class if name is None: name = self.default_VC_name print("--- Stage three ---") print("Inverting the shape matrix to get the virtual circuit matrix.") print(f"VC object stored under name: '{name}'.") # store the VC object dynamically store_VC = VirtualCircuit( name=name, eq=eq, profiles=profiles, shape_matrix=shape_matrix, VCs_matrix=np.linalg.pinv( shape_matrix ), # "virtual circuits" are the pseudo-inverse of the shape matrix target_names=target_names, coils=coils, target_calculator=target_calculator, ) setattr(self, name, store_VC)
[docs] def apply_VC( self, eq, profiles, VC_object, requested_target_shifts, verbose=False, ): """ Here we apply the VC matrix V to requested shifts in the target quantities (dT), obtaining the shift in the currents (in coils, dI) required to achieve this: dI = V * dT. Applying the current shifts to the existing currents, we re-solve the equilibrium and return to user. Parameters ---------- eq : object The equilibrium object upon which to apply the VCs. profiles : object The profiles object upon which to apply the VCs. VC_object : an instance of the VirtualCircuit class Specifies the virtual circuit matrix and properties. requested_target_shifts : list List of floats containing the shifts in all of the relevant targets. Same order as VC_object.target_names. verbose: bool Display output (or not). Returns ------- object Returns the equilibrium object after applying the shifted currents. object Returns the profiles object after applying the shifted currents. np.array Array of new target values. np.array Array of old target values. """ # verify targets, coils, and shifts all match those used to generate VCs if len(requested_target_shifts) != VC_object.VCs_matrix.shape[1]: raise ValueError( "The length of 'requested_target_shifts' does not match the list of targets " "associated with the supplied VC object!" ) # calculate current shifts required using shape matrix (for stability) # uses least squares solver to solve S*dI = dT # where dT are the target shifts and dI the current shifts current_shifts = np.linalg.lstsq( VC_object.shape_matrix, np.array(requested_target_shifts), rcond=None )[0] if verbose: print(f"Currents shifts from VCs:") print(f"{VC_object.coils} = {current_shifts}.") # re-solve static GS problem (to make sure it's solved already) try: self.solver.forward_solve( eq=eq, profiles=profiles, target_relative_tolerance=self.target_relative_tolerance, suppress=True, ) except AttributeError: raise AttributeError("Solver not defined. Call define_solver() first.") # calculate the targets # if not hasattr(self, "target_calculator"): # self.target_calculator = VC_object.target_calculator old_target_values = VC_object.target_calculator(eq) # store copies of the eq and profile objects eq_new = eq.create_auxiliary_equilibrium() profiles_new = profiles.copy() # assign currents to the required coils in the eq object new_currents = [ eq_new.tokamak.getCurrents()[name] + current_shifts[i] for i, name in enumerate(VC_object.coils) ] self.assign_currents(new_currents, VC_object.coils, eq=eq_new) # solve for the new equilibrium try: self.solver.forward_solve( eq_new, profiles_new, target_relative_tolerance=self.target_relative_tolerance, suppress=True, ) except AttributeError: raise AttributeError("Solver not defined. Call define_solver() first.") # calculate new target values and the difference vs. the old new_target_values = VC_object.target_calculator(eq_new) if verbose: print(f"Targets shifts from VCs:") print( f"{VC_object.target_names} = {new_target_values - old_target_values}." ) return eq_new, profiles_new, new_target_values, old_target_values