From 04b0894464783601bb1d75bef68fdcbb4f763762 Mon Sep 17 00:00:00 2001 From: Andy Date: Tue, 23 Jan 2024 18:06:06 +0100 Subject: [PATCH] PCA updates --- PCA/PCA.py | 165 +++++++++++--- PCA/PCAmovies.py | 503 +++++++++++++++++++++++++++++++++++++++++ Selection/MolSys.py | 180 +++------------ Selection/MovieSys.py | 186 +++++++++++++++ chimeraX/CMXEvents.py | 18 +- chimeraX/MovieTools.py | 148 ++++++++++++ chimeraX/Movies.py | 2 +- 7 files changed, 1016 insertions(+), 186 deletions(-) create mode 100644 PCA/PCAmovies.py create mode 100644 Selection/MovieSys.py create mode 100644 chimeraX/MovieTools.py diff --git a/PCA/PCA.py b/PCA/PCA.py index 42d782e..9e45140 100644 --- a/PCA/PCA.py +++ b/PCA/PCA.py @@ -17,6 +17,7 @@ from pyDR import clsDict from copy import copy,deepcopy from pyDR.misc.tools import linear_ex +from .PCAmovies import PCAmovies dtype=Defaults['dtype'] @@ -37,9 +38,12 @@ def __init__(self,select,align_ref='name CA'): self.align_ref=align_ref self.clear() self._source=clsDict['Source']('PCAmode') + self._n=None self._select_bond=self.select.select_bond + self._movie=None + assert self.select._mdmode==True,'select._mdmode could not be set to True. Multi-atom selections are not allowed in PCA' @property @@ -56,7 +60,7 @@ def clear(self): """ keys=['_pos','_covar','_sel1index','_sel2index','_lambda','_PC', - '_pcamp','_Ct','_data','_mean','_S2m','_S20m','_Ctdirect','_tcavg','_Ctprod','_fits','_mf'] + '_pcamp','_Ct','_data','_noopt','_fit','_directfit','_prodfit','_mean','_S2m','_S20m','_Am','_Ctdirect','_tcavg','_Ctprod','_mf'] for k in keys:setattr(self,k,None) return self @@ -561,6 +565,18 @@ def tc_index(self): """ return np.argsort(self.tc_avg) + + @property + def tc_rev_index(self): + """ + Returns the index to reverse tc sorting + + Returns + ------- + None. + + """ + return np.unique(self.tc_avg,return_inverse=True)[1] #%% Vector and Distance functions @@ -780,6 +796,24 @@ def S2m(self): S20m=np.concatenate((np.ones([1,self.nbonds]),S20m),axis=0) self._S2m=S20m[1:]/S20m[:-1] return self._S2m + + @property + def Am(self): + """ + Calculates the amplitude from each principal component to the bonds. + + + Returns + ------- + None. + + """ + + if self._Am is None: + S20m=np.concatenate([np.ones((1,self.S20m.shape[1])),self.S20m],axis=0) + self._Am=S20m[:-1]-S20m[1:] + return self._Am + #%% Correlation functions of the bonds @@ -933,7 +967,64 @@ def PCA2data(self): return copy(self._data) - def prod_v_direct(self,n=8): + def PCA2noopt(self,n=12): + if self._noopt is None or self._noopt.R.shape[1]!=n: + d=self.PCA2data() + d.detect.r_no_opt(12) + self._noopt=d.fit() + return self._noopt + + + @property + def n(self): + """ + Default number of detectors used to analyze a trajectory. Can also be + set by the user + + Returns + ------- + int + + """ + if self._n is None: + self._n=np.round(np.log10(self.traj.__len__())*2).astype(int) + return self._n + + @n.setter + def n(self,n): + self._n=n + + def PCA2fit(self,n=None): + if n is None:n=self.n + if self._fit is None or self._fit.R.shape[1]!=n: + noopt=self.PCA2noopt(max([12,n])) + noopt.detect.r_auto(n) + self._fit=noopt.fit() + return self._fit + + def direct2fit(self,n=None): + if n is None:n=self.n + if self._directfit is None or self._directfit.R.shape[1]!=n: + d=self.direct2data() + d.detect.r_no_opt(max([12,n])) + noopt=d.fit() + noopt.detect.r_auto(n) + self._directfit=noopt.fit() + return self._directfit + + def prod2fit(self,n=None): + if n is None:n=self.n + if self._prodfit is None or self._prodfit.R.shape[1]!=n: + d=self.prod2data() + d.detect.r_no_opt(max([12,n])) + noopt=d.fit() + noopt.detect.r_auto(n) + self._prodfit=noopt.fit() + return self._prodfit + + + + def prod_v_direct(self,n=None): """ Plots a comparison of the directly calculated and PCA-derived detector analysis of the bonds @@ -951,26 +1042,14 @@ def prod_v_direct(self,n=8): self.project.current_plot=self.project.current_plot+1 - if self._fits is None: - - data=self.direct2data() - data.detect.r_no_opt(12) - no=data.fit() - no.detect.r_auto(n) - fit=no.fit() - - pdata=self.prod2data() - pdata.detect.r_no_opt(12) - pno=pdata.fit() - pno.detect.r_auto(n) - pfit=pno.fit() - self._fits=fit,pfit + fit=self.direct2fit(n=n) + pfit=self.prod2fit(n=n) - self._fits[0].plot() - self._fits[1].plot() + fit.plot() + pfit.plot() - return self._fits + return fit,pfit #%% Estimating distributions @@ -1095,7 +1174,7 @@ def bond_dist(self,z=None): return z,A #%% Chimera videos - def rho_from_PCA(self,sens=None,rho_index:int=None,index:int=None): + def rho_from_PCA(self,PCAfit=None): """ Determines the contributions of each principal component to a given detector response. @@ -1114,26 +1193,35 @@ def rho_from_PCA(self,sens=None,rho_index:int=None,index:int=None): None. """ + + if PCAfit is None:PCAfit=self.PCA2fit() + + + rhoPCA=np.array([self.Am.T*R[self.tc_index] for R in PCAfit.R.T]) + return rhoPCA + - if rho_index is not None and sens is not None: - rhoz=sens.rhoz[rho_index] - z0=sens.z - else: - z0=np.linspace(*Defaults['zrange']) - rhoz=np.ones(z0.shape) + # if rho_index is not None and sens is not None: + # rhoz=sens.rhoz[rho_index] + # z0=sens.z + # else: + # z0=np.linspace(*Defaults['zrange']) + # rhoz=np.ones(z0.shape) - zf,Af=self.fit_PC()[:2] - zf,Af=zf[:,self.tc_index],Af[:,self.tc_index] + # zf,Af=self.fit_PC()[:2] + # zf,Af=zf[:,self.tc_index],Af[:,self.tc_index] - Am=1-self.S2m[:,index] + # Am=self.Am[:,index] - Afsc=Af*np.array([linear_ex(z0,rhoz,zf0) for zf0 in zf]) + # Afsc=Af*np.array([linear_ex(z0,rhoz,zf0) for zf0 in zf]) - A=(Afsc*Am).sum(0) + # A=(Afsc*Am).sum(0) + + # return A - def PCAweight(self,rho_index:int=None,index:int=None,frac:float=0.75,scaling:float=1): + def PCAweight(self,rho_index:int,index:int,PCAfit=None,frac:float=0.75): """ Determines the weighting of the principal components that accounts for most of a detector response for a given bond. By default, we account @@ -1147,9 +1235,20 @@ def PCAweight(self,rho_index:int=None,index:int=None,frac:float=0.75,scaling:flo can be modified by setting frac=0.75 """ - pass + A=self.rho_from_PCA(PCAfit)[rho_index][index] + + _,i,j=np.unique(A,return_index=True,return_inverse=True) + N=np.argmax(np.cumsum(A[i])/A.sum()>frac) + + + + @property + def movie(self): + if self._movie is None: + self._movie=PCAmovies(self) + return self._movie #%% PDB writing / Chimera def write_pdb(self,n:int=0,A:float=None,PCamp:list=None,filename:str=None): diff --git a/PCA/PCAmovies.py b/PCA/PCAmovies.py new file mode 100644 index 0000000..ca6c2e9 --- /dev/null +++ b/PCA/PCAmovies.py @@ -0,0 +1,503 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Mon Jan 22 12:57:46 2024 + +@author: albertsmith +""" + +import numpy as np +from ..chimeraX.MovieTools import lin_axis,log_axis,timescale_swp +from MDAnalysis import Writer +from copy import copy +import os + +class PCAmovies(): + def __init__(self,pca): + """ + Creates trajectories based on principal component analysis. + + Provide the PCA object and optionally a data object with fits of the + principal component (results of fitting pca.PCA2data()) + + Parameters + ---------- + pca : TYPE + DESCRIPTION. + + Returns + ------- + None. + + """ + self.pca=pca + self._n=None + self._data=None + self._direct_data=None + self._zrange=None + self._timescale=None + self._xtc=None + self._select=None + self._options=Options(self) + + + @property + def n(self): + """ + Default number of detectors used to analyze a trajectory. Can also be + set by the user + + Returns + ------- + int + + """ + if self._n is None: + return self.pca.n + + @n.setter + def n(self,n): + self._n=n + + @property + def data(self): + """ + Processed PCA detector analysis. Can be slow at the first call + + Returns + ------- + data + data object containing a detector analysis of the PCA. + + """ + if self._data is None: + d=self.pca.PCA2data() + d.detect.r_no_opt(self.n) + noo=d.fit() + noo.detect.r_auto(self.n) + self._data=noo.fit() + + return self._data + + + @data.setter + def data(self,data): + self._data=data + self._zrange=None + + @property + def direct_data(self): + """ + Data object containing direct analysis of the bond motion (no PCA). + + Returns + ------- + None. + + """ + if self._direct_data is None: + d=self.pca.direct2data() + d.detect.r_no_opt(self.n) + noo=d.fit() + noo.detect.r_auto(self.n) + self._direct_data=noo.fit() + return self._direct_data + + @direct_data.setter + def direct_data(self,data): + self._direct_data=data + + @property + def sens(self): + """ + Reference to the data's sensitivity object + + Returns + ------- + sens + + """ + return self.data.sens + + @property + def select(self): + """ + Selection object that only has atoms included in the pca + + Returns + ------- + MolSelect + + """ + if self._select is None: + self._select=self.pca.select.new_molsys_from_sel(self.pca.atoms) + return self._select + + @property + def molsys(self): + """ + Molsys with only atoms included in the pca + + Returns + ------- + MolSys + + """ + return self.select.molsys + + @property + def directory(self): + """ + Reference to the molsys default directory + + Returns + ------- + str + + """ + return self.molsys.directory + + @property + def options(self): + return self._options + + @property + def zrange(self): + """ + Default zrange for the given data object + + Returns + ------- + tuple + 2 elements specifying the z-range for this data object + + """ + if self._zrange is None: + rhoz=self.sens.rhoz[0] + + if rhoz[0]/rhoz.max()>0.9: + i=np.argmin(np.abs(rhoz/rhoz.max()-0.75)) + zrange=[self.sens.z[i]] + else: + zrange=[self.sens.info['z0'][0]] + + rhoz=self.sens.rhoz[-1] + if rhoz[-1]/rhoz.max()>0.9: + i=np.argmin(np.abs(rhoz/rhoz.max()-0.75)) + zrange.append(self.sens.z[i]) + else: + zrange.append(self.sens.info['z0'][-1]) + + self._zrange=zrange + return self._zrange + + @zrange.setter + def zrange(self,zrange): + assert len(zrange)==2,'zrange must have two elements' + self._zrange=zrange + + + def setup_swp(self,zrange=None,nframes:int=300,framerate:int=15): + """ + Prepares time axes required for sweeping the timescale. zrange may + be provided, but otherwise it will be calculated from the stored + sensitivity object. nframes defines the number of frames used in the + sweep and framerate defines how many frames that will be displayed per + second + + Parameters + ---------- + zrange : tuple/list, optional + Two elements given the range of correlation times (as log values) + The default is None, which will use internal values + nframes : int, optional + Number of frames in trajectory. The default is 150. + framerate : int, optional + Framerate for movie. The default is 15. + + Returns + ------- + dict with keys + tscale_swp : sampled timescales + t: time axis for the trajectory + index: frame index for the trajectory + + """ + out={} + if zrange is None:zrange=self.zrange + out['tscale_swp']=timescale_swp(zrange=zrange,nframes=nframes) + out['t'],out['index']=log_axis(traj=self.pca.traj,zrange=zrange,nframes=nframes,framerate=framerate) + + return out + + def weight(self,timescale): + """ + Returns a weighting of the principal components for a given timescale + or timescales. + + This is determined from the stored sensitity object and the relative + amplitudes of the detectors at the given timescale. + + Parameters + ---------- + timescale : float,array + timescale or timescales given on a log-scale. The default is None. + + Returns + ------- + np.array + + """ + if timescale is None: + timescale=timescale_swp(self.zrange) + timescale=np.atleast_1d(timescale) + + i=np.array([np.argmin(np.abs(ts-self.sens.z)) for ts in timescale],dtype=int) + wt0=self.sens.rhoz[:,i] + # wt0/=wt0.sum(0) + + return self.data.R@wt0 + + # def xtc_log_swp2(self,filename:str='temp.xtc',zrange=None,nframes:int=300,framerate:int=15): + # """ + # Create an xtc file for a log sweep of the timescales. Setup is the output + # of setup_swp, which can be omitted if default values are desired. + + # filename is the output filename of the xtc. If filetype (eg. .xtc) is + # omitted, then .xtc will be appended. It will be put into the molsys + # temporary folder by default + + # Parameters + # ---------- + # filename : str, optional + # DESCRIPTION. The default is 'temp.xtc'. + # setup : TYPE, optional + # DESCRIPTION. The default is None. + + # Returns + # ------- + # self + + # """ + + # setup=self.setup_swp(zrange=zrange,nframes=nframes,framerate=framerate) + + # wt=self.weight(setup['tscale_swp']) + # i=setup['index'] + + # ag=copy(self.pca.atoms) + + # filename=os.path.join(self.directory,filename) + + # pos0=pos=copy(self.pca.pos[0]) + # DelPC=np.concatenate([np.zeros([self.pca.PCamp.shape[0],1]), + # np.diff(self.pca.PCamp[:,i],axis=-1)],axis=-1)*wt + + + + # with Writer(filename, n_atoms=len(ag)) as w: + # for k,_ in enumerate(self.pca.traj[i]): + # # ag.positions=self.pca.mean+((wt[:,k]*self.pca.PCamp[:,i[k]])*self.pca.PCxyz).sum(-1).T + # pos+=(DelPC[:,k]*self.pca.PCxyz).sum(-1).T + # ag.positions=pos + # w.write(ag) + + # self._xtc=filename + + # return self + + def xtc_log_swp(self,filename:str='temp.xtc',zrange=None,nframes:int=300,framerate:int=15): + """ + Create an xtc file for a log sweep of the timescales. Setup is the output + of setup_swp, which can be omitted if default values are desired. + + filename is the output filename of the xtc. If filetype (eg. .xtc) is + omitted, then .xtc will be appended. It will be put into the molsys + temporary folder by default + + Parameters + ---------- + filename : str, optional + DESCRIPTION. The default is 'temp.xtc'. + setup : TYPE, optional + DESCRIPTION. The default is None. + + Returns + ------- + self + + """ + self.options.clear() + + setup=self.setup_swp(zrange=zrange,nframes=nframes,framerate=framerate) + self.setup=setup + + wt=self.weight(setup['tscale_swp']) + # wt=(wt.T/wt.mean(-1)).T + i=setup['index'] + + ag=copy(self.pca.atoms) + + filename=os.path.join(self.directory,filename) + + pos0=pos=copy(self.pca.pos[0]) + DelPC=np.concatenate([np.zeros([self.pca.PCamp.shape[0],1]), + np.diff(self.pca.PCamp[:,i],axis=-1)],axis=-1) + + PCcorr=self.pca.PCamp[:,i[-1]]-self.pca.PCamp[:,0]-DelPC.sum(-1) + + + PCamp=np.cumsum(DelPC,axis=-1)+np.atleast_2d(PCcorr).T@np.atleast_2d(i)/i[-1] + + pos=self.pca.pos[0].T+np.array([(self.pca.PCxyz*a).sum(-1) for a in PCamp.T]) + + # pos=self.pca.pos[0]+(np.cumsum(DelPC,axis=-1)*self.pca.PCxyz).sum(-1).T+\ + # +(np.atleast_2d(PCcorr).T@np.atleast_2d(i)/i[-1]) + + # pos=self.pca.pos[0]+np.array([]) + + + + + with Writer(filename, n_atoms=len(ag)) as w: + for pos0 in pos: + # ag.positions=self.pca.mean+((wt[:,k]*self.pca.PCamp[:,i[k]])*self.pca.PCxyz).sum(-1).T + ag.positions=pos0.T + w.write(ag) + + self._xtc=filename + self.options.DetFader().TimescaleIndicator() + + return self + + def play_xtc(self): + """ + Plays the last xtc created. Will also create the xtc if none is found + saved + + Returns + ------- + self + + """ + if self._xtc is None:self.xtc_log_swp() + self.molsys.new_trajectory(self._xtc) + self.molsys.movie() + self.options() + + return self + +class Options(): + def __init__(self,pca_movie): + self.dict={} + self.pca_movie=pca_movie + + def __call__(self): + for f in self.dict.values():f() + + def clear(self): + for k in self.dict: + self.remove_event(k) + self.dict={} + return self + + @property + def CMX(self): + return self.pca_movie.molsys.movie.CMX + + @property + def CMXid(self): + return self.pca_movie.molsys.movie.CMXid + + def add_event(self,name,*args): + self.CMX.add_event(self.CMXid,name,*args) + + def remove_event(self,name): + self.CMX.remove_event(self.CMXid,name) + + def TimescaleIndicator(self,tau=None,remove:bool=False): + """ + Add a timescale indicator to the trajectory + + Parameters + ---------- + tau : np.array + Timescales for each frame of the trajectory. + remove : bool + Remove the timescale indicator + + Returns + ------- + None. + + """ + if tau is None:tau=10**self.pca_movie.setup['tscale_swp']*1e9 + if remove: + if __name__ in self.dict:self.dict.pop(__name__) + return + if tau is not None: + self.dict['TimescaleIndicator']=lambda tau=tau:self.add_event('TimescaleIndicator', tau) + return self + + def DetFader(self,tau=None,index=None,rho_index=None,remove:bool=False): + """ + Add a Detector Fader to the trajectory + + Parameters + ---------- + tau : TYPE, optional + DESCRIPTION. The default is None. + index : TYPE, optional + DESCRIPTION. The default is None. + rho_index : TYPE, optional + DESCRIPTION. The default is None. + remove : bool, optional + DESCRIPTION. The default is False. + + Returns + ------- + self + + """ + + if tau is None:tau=10**self.pca_movie.setup['tscale_swp'] + + if remove: + if __name__ in self.dict:self.dict.pop(__name__) + return + + data=self.pca_movie.direct_data + rhoz=data.sens.rhoz + + if index is None:index=np.ones(data.R.shape[0],dtype=bool) + x=data.R[index] + + + if rho_index is None: + rho_index=np.ones(data.R.shape[1],dtype=bool) + if rhoz[-1][-1]>0.9:rho_index[-1]=False + + x=x[:,rho_index] + rhoz=rhoz[rho_index] + + tc=data.sens.tc + ids=[a.ids for a in self.pca_movie.select.repr_sel[index]] + + + + + + + def fun(x=x,ids=ids,tau=tau,rhoz=rhoz): + mn0,mn=self.pca_movie.molsys.movie.mdlnums + self.add_event('DetectorFader',mn,x,ids,tau*1e9,rhoz,tc,5) + self.CMX.command_line(self.CMXid,f'color #{mn0} tan') + self.CMX.command_line(self.CMXid,f'~ribbon #{mn0}') + self.CMX.command_line(self.CMXid,f'show #{mn0}') + self.CMX.command_line(self.CMXid,f'style #{mn0} ball') + + self.dict['DetFader']=fun + + return self + + \ No newline at end of file diff --git a/Selection/MolSys.py b/Selection/MolSys.py index 2de1ca9..37d9131 100644 --- a/Selection/MolSys.py +++ b/Selection/MolSys.py @@ -15,6 +15,7 @@ from pyDR import Defaults,clsDict from copy import copy import os +from pyDR.Selection.MovieSys import MovieSys dtype=Defaults['dtype'] class MolSys(): @@ -54,6 +55,7 @@ def __init__(self,topo:str,traj_files:list=None,t0:int=0,tf:int=None,step:int=1, MolSys. """ + self._directory=None if topo is not None and len(topo)==4 and not(os.path.exists(topo)): topo=getPDB(topo) @@ -87,6 +89,8 @@ def __init__(self,topo:str,traj_files:list=None,t0:int=0,tf:int=None,step:int=1, self.project=project self._movie=None + + @property def uni(self): @@ -107,6 +111,26 @@ def details(self): out.extend(self.traj.details) return out + @property + def directory(self): + """ + Returns a working directory for temporary files + + Returns + ------- + str + + """ + if self._directory is None: + # Set up temp directory, filename + if self.project is None or self.project.directory is None: + directory=os.path.abspath('temp_pyDR') + else: + directory=os.path.join(self.project.directory,'temp_pyDR') + if not(os.path.exists(directory)):os.mkdir(directory) + self._directory=directory + return self._directory + @property def _hash(self): @@ -267,12 +291,7 @@ def new_molsys_from_sel(self,sel): # Check that the universe is the same assert self.uni==sel.universe,'Atomgroup must have the same universe as MolSelect object' - # Set up temp directory, filename - if self.project is None or self.project.directory is None: - directory=os.path.abspath('temp_pyDR') - else: - directory=os.path.join(self.project.directory,'temp_pyDR') - if not(os.path.exists(directory)):os.mkdir(directory) + directory=self.directory filename=os.path.split(self.topo)[1].split('.')[0] filename0=filename @@ -327,7 +346,7 @@ def new_trajectory(self,traj_files,t0:int=0,tf:int=None,step:int=1,dt:float=None def __del__(self): """ - Deletes topology if temporary + Deletes topology and working directory if temporary Returns ------- @@ -336,8 +355,10 @@ def __del__(self): """ if os.path.split(os.path.split(self.topo)[0])[1]=='temp_pyDR': os.remove(self.topo) - if len(os.listdir(os.path.split(self.topo)[0]))==0: - os.rmdir(os.path.split(self.topo)[0]) + + if self._directory is not None: + if len(os.listdir(self.directory))==0: + os.rmdir(self.directory) @property def movie(self): @@ -357,148 +378,7 @@ def movie(self): if self._movie is None:self._movie=MovieSys(self) return self._movie -class MovieSys(): - def __init__(self,molsys): - self.molsys=molsys - self._mdlnums=None - self._ID=None - self.CMX=clsDict['CMXRemote'] - - @property - def project(self): - return self.molsys.project - - @property - def traj(self): - return self.molsys.traj - - @property - def CMXid(self): - """ - ID of the current ChimeraX session. If there is no session, or the - previous session was closed or disconnected, then this will launch - a new ChimeraX session and update the ID. - - In case of a new launch, previous mdlnums are deleted since we have lost - access to them. - - Returns - ------- - int - ChimeraX session ID - - """ - if self._ID is not None and not(self.CMX.isConnected(self._ID)): - self._ID=None - self._mdlnums=None - - if self._ID is None: - if self.project is not None: - if self.project.chimera.CMXid is None: - self.project.chimera.current=0 - self._ID=self.project.chimera.CMXid - else: - self._ID=self.CMX.launch() - - return self._ID - - - - @property - def mdlnums(self): - if self._mdlnums is not None and self._mdlnums[0] not in self.CMX.valid_models(self.CMXid): - self._mdlnums=None - return self._mdlnums - - @mdlnums.setter - def mdlnums(self,mdlnums): - self._mdlnums=mdlnums - - def open_pdb(self): - """ - Launches a movie of the stored trajectory in ChimeraX - - Returns - ------- - self - - """ - - CMX=self.CMX - ID=self.CMXid - - om=CMX.how_many_models(ID) - CMX.send_command(ID,f'open "{self.molsys.topo}" coordset true') - while om==CMX.how_many_models(ID): - pass - self.mdlnums=CMX.valid_models(ID)[-1],CMX.how_many_models(ID)-1 - return self - - def update_traj(self): - """ - Replaces the current coordset in Chimera with whatever trajectory is - currently stored in the molsys object - - Returns - ------- - None. - - """ - nm=self.mdlnums[0] - for file in self.traj.files:self.CMX.send_command(self.CMXid,f"open '{file}' structureModel #{nm}") - - return self - - def close(self): - """ - Closes the model previously opened by this object - - Returns - ------- - None. - """ - if self.mdlnums is not None: - self.CMX.send_command(self.CMXid,f"close #{self.mdlnums[0]}") - self.mdlnums=None - return self - - def __call__(self): - """ - Starts a movie with the current trajectory. If a model is already - open in Chimera, then the trajectory is only updated, but the original - model is retained - - Returns - ------- - self - - """ - if self.mdlnums is None: - self.open_pdb() - self.update_traj() - return self - - def record(self,filename:str): - """ - Plays the current trajectory and records as an mp4 file. If the filename - is provided without an absolute path, then it will be stored in a - subfolder of the project folder ({project_folder}/movies). - - Otherwise it will be stored in the current directory - - Returns - ------- - filepath - - """ - if filename[-4:]!='.mp4':filename+='.mp4' - if self.project is not None and self.project.directory is not None: - if not(os.path.exists(os.path.join(self.project.directory,'movies'))): - os.mkdir(os.path.join(self.project.directory,'movies')) - filepath=os.path.join(os.path.join(self.project.directory,'movies'),filename) - else: - diff --git a/Selection/MovieSys.py b/Selection/MovieSys.py new file mode 100644 index 0000000..40aff7c --- /dev/null +++ b/Selection/MovieSys.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Mon Jan 22 13:18:10 2024 + +@author: albertsmith +""" + +from .. import clsDict +import os + +class MovieSys(): + def __init__(self,molsys): + self.molsys=molsys + self._mdlnums=None + self._ID=None + self.CMX=clsDict['CMXRemote'] + + @property + def project(self): + return self.molsys.project + + @property + def directory(self): + return self.molsys.directory + + @property + def traj(self): + return self.molsys.traj + + @property + def CMXid(self): + """ + ID of the current ChimeraX session. If there is no session, or the + previous session was closed or disconnected, then this will launch + a new ChimeraX session and update the ID. + + In case of a new launch, previous mdlnums are deleted since we have lost + access to them. + + Returns + ------- + int + ChimeraX session ID + + """ + if self._ID is not None and not(self.CMX.isConnected(self._ID)): + self._ID=None + self._mdlnums=None + + if self._ID is None: + if self.project is not None: + if self.project.chimera.CMXid is None: + self.project.chimera.current=0 + self._ID=self.project.chimera.CMXid + else: + self._ID=self.CMX.launch() + + return self._ID + + def command_line(self,cmds): + """ + Pass commands to the current chimeraX instance + + Parameters + ---------- + cmds : TYPE + DESCRIPTION. + + Returns + ------- + None. + + """ + if isinstance(cmds,list): + for cmd in cmds: + self.command_line(cmd) + return self + self.CMX.send_command(self.CMXid,cmds) + return self + + @property + def mdlnums(self): + if self._mdlnums is not None and self._mdlnums[0] not in self.CMX.valid_models(self.CMXid): + self._mdlnums=None + return self._mdlnums + + @mdlnums.setter + def mdlnums(self,mdlnums): + self._mdlnums=mdlnums + + def open_pdb(self): + """ + Launches a movie of the stored trajectory in ChimeraX + + Returns + ------- + self + + """ + + CMX=self.CMX + ID=self.CMXid + + om=CMX.how_many_models(ID) + CMX.send_command(ID,f'open "{self.molsys.topo}" coordset true') + while om==CMX.how_many_models(ID): + pass + self.mdlnums=CMX.valid_models(ID)[-1],CMX.how_many_models(ID)-1 + return self + + def update_traj(self): + """ + Replaces the current coordset in Chimera with whatever trajectory is + currently stored in the molsys object + + Returns + ------- + None. + + """ + nm=self.mdlnums[0] + for file in self.traj.files:self.CMX.send_command(self.CMXid,f'open "{file}" structureModel #{nm}') + + return self + + def close(self): + """ + Closes the model previously opened by this object + + Returns + ------- + None. + + """ + if self.mdlnums is not None: + self.CMX.send_command(self.CMXid,f"close #{self.mdlnums[0]}") + self.mdlnums=None + return self + + def __call__(self): + """ + Starts a movie with the current trajectory. If a model is already + open in Chimera, then the trajectory is only updated, but the original + model is retained + + Returns + ------- + self + + """ + if self.mdlnums is None: + self.open_pdb() + self.update_traj() + return self + + def record(self,filename:str,framerate=15): + """ + Plays the current trajectory and records as an mp4 file. If the filename + is provided without an absolute path, then it will be stored in a + subfolder of the project folder ({project_folder}/movies). + + Otherwise it will be stored in the current directory + + Returns + ------- + filepath + + """ + if filename[-4:]!='.mp4':filename+='.mp4' + if self.project is not None and self.project.directory is not None: + filename=os.path.join(self.project.directory,filename) + else: + filename=os.path.abspath(filename) + + mn=self.mdlnums[0] + + cxc=os.path.join(self.directory,'temp.cxc') + with open(cxc,'w') as f: + f.write('movie record\n') + f.write(f'coordset #{mn}\n') + f.write(f'wait {len(self.molsys.traj)}\n') + f.write(f'movie encode "{filename}" framerate {framerate}\n') + + self.command_line(f'open "{cxc}"') + return self \ No newline at end of file diff --git a/chimeraX/CMXEvents.py b/chimeraX/CMXEvents.py index 2126ea3..6beaf8e 100644 --- a/chimeraX/CMXEvents.py +++ b/chimeraX/CMXEvents.py @@ -29,10 +29,20 @@ class DetectorFader(): def __init__(self, cmx, *args): self.cmx = cmx self.session = cmx.session - self.fader = DetFader(self.session.models[-1], *args) + if isinstance(args[0],int): + mn=args[0] + args=args[1:] + else: + mn=-1 + self.fader = DetFader(self.session.models[mn], *args) def __call__(self): - self.fader.set_color_radius() + if self.fader is not None: + self.fader.set_color_radius() + + def cleanup(self): + self.fader=None + class TimescaleIndicator(): @@ -63,6 +73,10 @@ def __call__(self): self.label.text = text self.label.update_drawing() self.i = i + + def cleanup(self): + self.label.text='' + self.label.update_drawing() class Hover(): diff --git a/chimeraX/MovieTools.py b/chimeraX/MovieTools.py new file mode 100644 index 0000000..071543c --- /dev/null +++ b/chimeraX/MovieTools.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Mon Jan 22 13:33:06 2024 + +@author: albertsmith +""" + +import numpy as np + +def timescale_swp(zrange=[-10,-6],nframes:int=150): + """ + Returns a sweep of the timescale defined by zrange. zrange is given + by two values on a log scale, e.g. default [-10 -6] runs from 100 ps to 1 + μs. + + timescale = np.logspace(zrange[0],zrange[1],nframes) + + Parameters + ---------- + zrange : TYPE, optional + DESCRIPTION. The default is [-10,-6]. + nframes : TYPE + DESCRIPTION. + framerate : TYPE + DESCRIPTION. + + Returns + ------- + np.array + 1D array of the timescales on logscale + + """ + return np.linspace(zrange[0],zrange[1],nframes) + +def log_axis(traj,zrange=[-10,-6],nframes:int=150,framerate:int=15): + """ + Returns a time axis to cover timescales defined by zrange. zrange is given + by two values on a log scale, e.g. default [-10 -6] runs from 100 ps to 1 + μs. This means that at the input framerate (default 15 ns), 1 second of + movie will cover 100 ps of trajectory at the beginning of the axis and + 1 second of movie will cover 1 μs of trajectory at the end of the axis. + + timestep beginning: 100 ps/15 fps = 6.67 ps/frame + timestep end: 1 μs/15 fps = 66.67 ns / frame + + target timestep: + np.logspace(zrange[0]-np.log10(framerate),zrange[1]-np.log10(framerate),nframes-1) + + Note that if the initial timestep is too short for the stepsize of the + trajectory, then the initial timestep will just be the trajectory's + timestep, although this timestep will be retained up until it is shorter + than the target timestep. + + If the trajectory is not long enough to support the requested range of + timescales, then timesteps at the end of the trajectory will be shortened + until the time axis fits. + + traj is usually the Trajectory object contained in MolSys and MolSelect + objects. However, it may also be a tuple (dt,nt) with the timestep in + picoseconds and number of points in the trajectory. + + The returned time axis is rounded to match dt in the time axis. An index + of timepoints for the original axis is also returned + + Parameters + ---------- + traj : Trajectory object / tuple + Either a trajectory object, or tuple with dt (trajectory timestep) and + nt (number of frames in trajectory) + zrange : TYPE, optional + DESCRIPTION. The default is [-9,-5]. + nframes : TYPE, optional + DESCRIPTION. The default is 150. + framerate : TYPE, optional + DESCRIPTION. The default is 15. + + Returns + ------- + tuple + t: numpy float array (nanoseconds), index: numpy int array + + """ + + dt,nt=traj if len(traj)==2 else traj.dt,len(traj) + dt*=1e-12 + total=dt*(nt-1) + + Dt=np.logspace(zrange[0]-np.log10(framerate),zrange[1]-np.log10(framerate),nframes-1) + Dt[Dttotal + + Dt[tt>total]=Dt[np.argmax(i)-1] + + t=np.concatenate([[0],np.cumsum(Dt)]) + i=np.round(t/dt).astype(int) + t=i*dt*1e9 + + return t,i + +def lin_axis(traj,z:float=-9,nframes:int=75,framerate:int=15): + """ + Returns a time axis around the timescale defined by z, e.g. with the default + -9, one second of movie will cover 1 nanosecond at the input framerate. + + If the input z value cannot be matched due to the timestep or trajectory + length, then this will be adjusted + + Parameters + ---------- + traj : TYPE + DESCRIPTION. + z : TYPE, optional + DESCRIPTION. The default is -9. + nframes : TYPE, optional + DESCRIPTION. The default is 75. + framerate : TYPE, optional + DESCRIPTION. The default is 15. + + Returns + ------- + tuple + t: numpy float array (nanoseconds), index: numpy int array + + """ + dt,nt=traj if len(traj)==2 else traj.dt,len(traj) + dt*=1e-12 + total=dt*(nt-1) + + Dt=10**z + + if Dttotal: + print(f'Requested timescale ({10**z*1e9:.1f} ns) is too long for a trajectory with length={total*1e9:.1f} ns') + Dt=total/nframes + + t=np.arange(nframes)*Dt + i=np.round(t/dt).astype(int) + t=i*dt*1e9 + + return t,i + \ No newline at end of file diff --git a/chimeraX/Movies.py b/chimeraX/Movies.py index 1118522..de7f581 100644 --- a/chimeraX/Movies.py +++ b/chimeraX/Movies.py @@ -282,7 +282,7 @@ def play_traj(self,i:int=0,ID:int=None): self.chimera.command_line(cmds,ID=ID) def det_fader(self,rho_index=None,i:int=0,ID:int=None,scaling=None): - ID=self.defaultID(ID) + ID=self.defaultID(ID) if ID is None else ID rho_index=np.arange(self.data.sens.rhoz.shape[0]) if rho_index is None else np.array(rho_index,dtype=int) self.play_traj(i=i,ID=ID) x,ids=self.det_x_id(i)