diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a2bf6067..7b02036e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -27,6 +27,7 @@ repos: rev: 23.9.1 hooks: - id: black + args: ["--line-length", "120"] - repo: https://github.com/charliermarsh/ruff-pre-commit # Ruff version. rev: v0.0.292 diff --git a/compare_logs.py b/compare_logs.py index 71977e0a..5d5ef90a 100644 --- a/compare_logs.py +++ b/compare_logs.py @@ -93,11 +93,7 @@ def infNorm(x, **kwargs): plt.yscale("log") plt.legend() plt.grid(which="both") -plt.title( - "Comparison between {:s} and {:s}".format( - get_solver_name(data1), get_solver_name(data2) - ) -) +plt.title("Comparison between {:s} and {:s}".format(get_solver_name(data1), get_solver_name(data2))) # Error over time & mpc iteration Xerr_over_time = infNorm(Xerr[::WBC_RATIO], axis=2) diff --git a/examples/bench_croc_ocp.py b/examples/bench_croc_ocp.py index 51de483e..c6869ce5 100644 --- a/examples/bench_croc_ocp.py +++ b/examples/bench_croc_ocp.py @@ -75,23 +75,9 @@ def runShootingProblemCalcDiffBenchmark(xs, us, problem): print("Python bindings:") xs, us, problem = createProblem() avrg_duration, min_duration, max_duration = runDDPSolveBenchmark(xs, us, problem) -print( - " DDP.solve [ms]: {0} ({1}, {2})".format(avrg_duration, min_duration, max_duration) -) -avrg_duration, min_duration, max_duration = runShootingProblemCalcBenchmark( - xs, us, problem -) -print( - " ShootingProblem.calc [ms]: {0} ({1}, {2})".format( - avrg_duration, min_duration, max_duration - ) -) -avrg_duration, min_duration, max_duration = runShootingProblemCalcDiffBenchmark( - xs, us, problem -) -print( - " ShootingProblem.calcDiff [ms]: {0} ({1}, {2})".format( - avrg_duration, min_duration, max_duration - ) -) +print(" DDP.solve [ms]: {0} ({1}, {2})".format(avrg_duration, min_duration, max_duration)) +avrg_duration, min_duration, max_duration = runShootingProblemCalcBenchmark(xs, us, problem) +print(" ShootingProblem.calc [ms]: {0} ({1}, {2})".format(avrg_duration, min_duration, max_duration)) +avrg_duration, min_duration, max_duration = runShootingProblemCalcDiffBenchmark(xs, us, problem) +print(" ShootingProblem.calcDiff [ms]: {0} ({1}, {2})".format(avrg_duration, min_duration, max_duration)) print("\033[0m") diff --git a/python/quadruped_reactive_walking/controller.py b/python/quadruped_reactive_walking/controller.py index e02df358..3057ae6c 100644 --- a/python/quadruped_reactive_walking/controller.py +++ b/python/quadruped_reactive_walking/controller.py @@ -63,9 +63,7 @@ class Controller: t_mpc = 0.0 q_security = np.array([1.2, 2.1, 3.14] * 4) - def __init__( - self, params: qrw.Params, q_init, solver_cls: Type[wb_mpc.OCPAbstract] - ): + def __init__(self, params: qrw.Params, q_init, solver_cls: Type[wb_mpc.OCPAbstract]): """ Function that computes the reference control (tau, q_des, v_des and gains) @@ -98,9 +96,7 @@ def __init__( self.result.v_des = self.task.v0[6:].copy() self.target = Target(params) - self.footsteps, self.base_refs = make_footsteps_and_refs( - self.params, self.target - ) + self.footsteps, self.base_refs = make_footsteps_and_refs(self.params, self.target) self.default_footstep = make_initial_footstep(params.q_init) self.target_base = pin.Motion.Zero() @@ -134,15 +130,11 @@ def _create_mpc(self, solver_cls): if self.params.asynchronous_mpc: from .wbmpc_wrapper_ros_mp import ROSMPCAsyncClient - return ROSMPCAsyncClient( - self.params, self.footsteps, self.base_refs, solver_cls - ) + return ROSMPCAsyncClient(self.params, self.footsteps, self.base_refs, solver_cls) else: from .wbmpc_wrapper_ros import ROSMPCWrapperClient - return ROSMPCWrapperClient( - self.params, self.footsteps, self.base_refs, solver_cls, True - ) + return ROSMPCWrapperClient(self.params, self.footsteps, self.base_refs, solver_cls, True) else: if self.params.asynchronous_mpc: from .wbmpc_wrapper_multiprocess import ( @@ -181,9 +173,7 @@ def compute(self, device, qc=None): self.target_footstep[:] = 0.0 else: self.target_base.np[:] = 0.0 - self.target_footstep[:] = self.target.compute( - self.k + self.params.N_gait * self.params.mpc_wbc_ratio - ) + self.target_footstep[:] = self.target.compute(self.k + self.params.N_gait * self.params.mpc_wbc_ratio) if self.k % self.params.mpc_wbc_ratio == 0: if self.mpc_solved: @@ -197,9 +187,7 @@ def compute(self, device, qc=None): try: self.t_mpc_start = time.time() - self.mpc.solve( - self.k, x, self.target_footstep.copy(), self.target_base.copy() - ) + self.mpc.solve(self.k, x, self.target_footstep.copy(), self.target_base.copy()) except ValueError: import traceback @@ -301,14 +289,10 @@ def clamp_result(self, device, set_error=False): if self.clamp(self.result.q_des[3 * i + 1], -hip_max, hip_max): hip_ids.append(i) self.error = set_error - if self.task.q0[7 + 3 * i + 2] >= 0.0 and self.clamp( - self.result.q_des[3 * i + 2], knee_min - ): + if self.task.q0[7 + 3 * i + 2] >= 0.0 and self.clamp(self.result.q_des[3 * i + 2], knee_min): knee_ids.append(i) self.error = set_error - elif self.task.q0[7 + 3 * i + 2] <= 0.0 and self.clamp( - self.result.q_des[3 * i + 2], max_value=-knee_min - ): + elif self.task.q0[7 + 3 * i + 2] <= 0.0 and self.clamp(self.result.q_des[3 * i + 2], max_value=-knee_min): knee_ids.append(i) self.error = set_error if len(hip_ids) > 0: @@ -365,9 +349,7 @@ def save_guess(self, filename="/tmp/init_guess.npy"): ) print("Initial guess saved") - def run_estimator( - self, device, q_perfect=np.zeros(6), b_baseVel_perfect=np.zeros(3) - ): + def run_estimator(self, device, q_perfect=np.zeros(6), b_baseVel_perfect=np.zeros(3)): """ Call the estimator and retrieve the reference and estimated quantities. Run a filter on q, h_v and v_ref. @@ -418,9 +400,7 @@ def run_estimator( self.q_filtered = self.q_estimate.copy() self.q_filtered[:3] = self.base_position_filtered[:3] - self.q_filtered[3:7] = pin.Quaternion( - pin.rpy.rpyToMatrix(self.base_position_filtered[3:]) - ).coeffs() + self.q_filtered[3:7] = pin.Quaternion(pin.rpy.rpyToMatrix(self.base_position_filtered[3:])).coeffs() self.v_filtered = self.v_estimate.copy() # self.v_filtered[:6] = np.zeros(6) # self.v_filtered[:6] = self.filter_v.filter(self.v_windowed, False) diff --git a/python/quadruped_reactive_walking/main_solo12_control.py b/python/quadruped_reactive_walking/main_solo12_control.py index a14b75cb..31b38219 100644 --- a/python/quadruped_reactive_walking/main_solo12_control.py +++ b/python/quadruped_reactive_walking/main_solo12_control.py @@ -188,9 +188,7 @@ def main(args): device, qc = get_device(params.SIMULATION, sim_params["record_video"]) if params.LOGGING or params.PLOTTING: - logger = LoggerControl( - params, log_size=params.N_SIMULATION, solver_cls_name=args.solver - ) + logger = LoggerControl(params, log_size=params.N_SIMULATION, solver_cls_name=args.solver) else: logger = None @@ -217,9 +215,7 @@ def main(args): T_whole = time.time() dT_whole = 0.0 disable = params.ocp.verbose - bar_format = ( - "{desc}: {percentage:.4f}%|{bar}| {n:.3f}/{total:.3f} [{elapsed}<{remaining}]" - ) + bar_format = "{desc}: {percentage:.4f}%|{bar}| {n:.3f}/{total:.3f} [{elapsed}<{remaining}]" with tqdm.tqdm( desc="MPC cycles", total=t_max + params.dt_wbc, @@ -251,9 +247,7 @@ def main(args): device.joints.set_velocity_gains(controller.result.D) device.joints.set_desired_positions(controller.result.q_des) device.joints.set_desired_velocities(controller.result.v_des) - device.joints.set_torques( - controller.result.FF_weight * controller.result.tau_ff.ravel() - ) + device.joints.set_torques(controller.result.FF_weight * controller.result.tau_ff.ravel()) device.send_command_and_wait_end_of_cycle(params.dt_wbc) if params.LOGGING or params.PLOTTING: diff --git a/python/quadruped_reactive_walking/ocp_defs/walking.py b/python/quadruped_reactive_walking/ocp_defs/walking.py index 2b4bf07c..4f6c9d24 100644 --- a/python/quadruped_reactive_walking/ocp_defs/walking.py +++ b/python/quadruped_reactive_walking/ocp_defs/walking.py @@ -38,12 +38,8 @@ def __init__(self, params: Params, footsteps, base_vel_refs): axis=0, ) - self.life_rm, self.life_tm = self.initialize_models_from_gait( - self.life_gait, footsteps, base_vel_refs - ) - self.start_rm, self.start_tm = self.initialize_models_from_gait( - self.starting_gait - ) + self.life_rm, self.life_tm = self.initialize_models_from_gait(self.life_gait, footsteps, base_vel_refs) + self.start_rm, self.start_tm = self.initialize_models_from_gait(self.starting_gait) self.end_rm, self.end_tm = self.initialize_models_from_gait(self.ending_gait) self.x0 = self.task.x0 @@ -68,12 +64,7 @@ def select_next_model(self, k, current_gait, base_vel_ref): no_copy_roll_insert(current_gait, self.life_gait[-1]) else: - i = ( - 0 - if t - == len(self.start_rm) + len(self.life_rm) * self.params.gait_repetitions - else 1 - ) + i = 0 if t == len(self.start_rm) + len(self.life_rm) * self.params.gait_repetitions else 1 # choose to pich the node with impact or not support_feet = feet_ids[self.ending_gait[i] == 1] model = self.end_rm[i] @@ -98,29 +89,19 @@ def initialize_models_from_gait(self, gait, footsteps=None, base_vel_refs=None): feet_ids = np.asarray(self.task.feet_ids) for t in range(gait.shape[0]): support_feet_ids = feet_ids[gait[t] == 1] - feet_pos = ( - get_active_feet(footsteps[t], support_feet_ids) - if footsteps is not None - else [] - ) + feet_pos = get_active_feet(footsteps[t], support_feet_ids) if footsteps is not None else [] base_vel_ref = base_vel_refs[t] if base_vel_refs is not None else None has_switched = np.any(gait[t] != gait[t - 1]) switch_matrix = gait[t] if has_switched else np.array([]) switch_feet = feet_ids[switch_matrix == 1] - running_models.append( - self.make_running_model( - support_feet_ids, switch_feet, feet_pos, base_vel_ref - ) - ) + running_models.append(self.make_running_model(support_feet_ids, switch_feet, feet_pos, base_vel_ref)) support_feet_ids = feet_ids[gait[-1] == 1] terminal_model = self.make_terminal_model(support_feet_ids) return running_models, terminal_model - def _create_standard_model( - self, support_feet - ) -> crocoddyl.IntegratedActionModelAbstract: + def _create_standard_model(self, support_feet) -> crocoddyl.IntegratedActionModelAbstract: """ Create a standard integrated action model, to be modified by the callee. @@ -160,14 +141,10 @@ def _create_standard_model( ActivationBounds(-self.task.state_limit, self.task.state_limit), self.task.state_bound_w**2, ) - state_bound_cost = CostModelResidual( - self.state, activation, state_bound_residual - ) + state_bound_cost = CostModelResidual(self.state, activation, state_bound_residual) costs.addCost("state_limitBound", state_bound_cost, 1) - diff = DifferentialActionModelContactFwdDynamics( - self.state, actuation, contacts, costs, 0.0, True - ) + diff = DifferentialActionModelContactFwdDynamics(self.state, actuation, contacts, costs, 0.0, True) return IntegratedActionModelEuler(diff, self.params.dt_mpc) def make_running_model( @@ -213,17 +190,13 @@ def make_running_model( def _add_control_costs(self, costs: CostModelSum): nu = costs.nu - control_reg = CostModelResidual( - self.state, ResidualModelControl(self.state, self.task.uref) - ) + control_reg = CostModelResidual(self.state, ResidualModelControl(self.state, self.task.uref)) costs.addCost("control_reg", control_reg, self.task.control_reg_w) control_bound_activation = crocoddyl.ActivationModelQuadraticBarrier( ActivationBounds(-self.task.effort_limit, self.task.effort_limit) ) - control_bound = CostModelResidual( - self.state, control_bound_activation, ResidualModelControl(self.state, nu) - ) + control_bound = CostModelResidual(self.state, control_bound_activation, ResidualModelControl(self.state, nu)) costs.addCost("control_bound", control_bound, self.task.control_bound_w) def make_terminal_model(self, support_feet): @@ -241,13 +214,9 @@ def make_terminal_model(self, support_feet): def _add_friction_cost(self, i: int, support_feet, costs: CostModelSum): nu = costs.nu # Contact forces - cone = crocoddyl.FrictionCone( - self.task.Rsurf, self.task.friction_mu, 4, False, 3 - ) + cone = crocoddyl.FrictionCone(self.task.Rsurf, self.task.friction_mu, 4, False, 3) residual = crocoddyl.ResidualModelContactFrictionCone(self.state, i, cone, nu) - activation = crocoddyl.ActivationModelQuadraticBarrier( - ActivationBounds(cone.lb, cone.ub) - ) + activation = crocoddyl.ActivationModelQuadraticBarrier(ActivationBounds(cone.lb, cone.ub)) friction_cost = CostModelResidual(self.state, activation, residual) friction_name = self.rmodel.frames[i].name + "_friction_cost" costs.addCost(friction_name, friction_cost, self.task.friction_cone_w) @@ -275,9 +244,7 @@ def _add_foot_track_cost(self, i: int, costs: CostModelSum): nu = costs.nu # Tracking foot trajectory name = self.rmodel.frames[i].name + "_foot_tracking" - residual = crocoddyl.ResidualModelFrameTranslation( - self.state, i, np.zeros(3), nu - ) + residual = crocoddyl.ResidualModelFrameTranslation(self.state, i, np.zeros(3), nu) foot_tracking = CostModelResidual(self.state, residual) costs.addCost(name, foot_tracking, self.task.foot_tracking_w) costs.changeCostStatus(name, False) @@ -286,17 +253,11 @@ def _add_ground_coll_penalty(self, i: int, costs: CostModelSum, start_pos): nu = costs.nu # Swing foot - ground_coll_res = crocoddyl.ResidualModelFrameTranslation( - self.state, i, start_pos, nu - ) + ground_coll_res = crocoddyl.ResidualModelFrameTranslation(self.state, i, start_pos, nu) - bounds = ActivationBounds( - np.array([-1000, -1000, start_pos[2]]), np.array([1000, 1000, 1000]) - ) + bounds = ActivationBounds(np.array([-1000, -1000, start_pos[2]]), np.array([1000, 1000, 1000])) ground_coll_activ = crocoddyl.ActivationModelQuadraticBarrier(bounds) - ground_coll_cost = CostModelResidual( - self.state, ground_coll_activ, ground_coll_res - ) + ground_coll_cost = CostModelResidual(self.state, ground_coll_activ, ground_coll_res) name = "{}_groundCol".format(self.rmodel.frames[i].name) costs.addCost( @@ -337,9 +298,7 @@ def _add_vert_velocity_cost(self, i: int, costs: CostModelSum): pin.ReferenceFrame.WORLD, nu, ) - vertical_velocity_activation = ActivationModelWeightedQuad( - np.array([0, 0, 1, 0, 0, 0]) - ) + vertical_velocity_activation = ActivationModelWeightedQuad(np.array([0, 0, 1, 0, 0, 0])) name = "{}_vel_zReg".format(self.rmodel.frames[i].name) vertical_velocity_reg_cost = CostModelResidual( @@ -420,13 +379,9 @@ def update_model( name = self.rmodel.frames[i].name + "_contact" model.differential.contacts.changeContactStatus(name, i in support_feet) if not is_terminal: - self.update_tracking_costs( - model.differential.costs, feet_pos, base_vel_ref, support_feet - ) + self.update_tracking_costs(model.differential.costs, feet_pos, base_vel_ref, support_feet) - def update_tracking_costs( - self, costs, feet_pos: List[np.ndarray], base_vel_ref: pin.Motion, support_feet - ): + def update_tracking_costs(self, costs, feet_pos: List[np.ndarray], base_vel_ref: pin.Motion, support_feet): index = 0 for i in self.task.feet_ids: if self.has_foot_track_cost: diff --git a/python/quadruped_reactive_walking/tools/logger_control.py b/python/quadruped_reactive_walking/tools/logger_control.py index b9ea14c5..f33c3a50 100644 --- a/python/quadruped_reactive_walking/tools/logger_control.py +++ b/python/quadruped_reactive_walking/tools/logger_control.py @@ -160,26 +160,14 @@ def sample(self, controller: Controller, device, qualisys=None): if self.i == 0: for i in range(self.params.N_gait * self.params.mpc_wbc_ratio): - self.target[i] = controller.footsteps[i // self.params.mpc_wbc_ratio][ - :, 1 - ] - self.target_base_linear[i] = controller.base_refs[ - i // self.params.mpc_wbc_ratio - ].linear - self.target_base_angular[i] = controller.base_refs[ - i // self.params.mpc_wbc_ratio - ].angular + self.target[i] = controller.footsteps[i // self.params.mpc_wbc_ratio][:, 1] + self.target_base_linear[i] = controller.base_refs[i // self.params.mpc_wbc_ratio].linear + self.target_base_angular[i] = controller.base_refs[i // self.params.mpc_wbc_ratio].angular if self.i + self.params.N_gait * self.params.mpc_wbc_ratio < self.log_size: - self.target[ - self.i + self.params.N_gait * self.params.mpc_wbc_ratio - ] = controller.target_footstep[:, 1] - self.target_base_linear[ - self.i + self.params.N_gait * self.params.mpc_wbc_ratio - ] = controller.v_ref[:][:3] + self.target[self.i + self.params.N_gait * self.params.mpc_wbc_ratio] = controller.target_footstep[:, 1] + self.target_base_linear[self.i + self.params.N_gait * self.params.mpc_wbc_ratio] = controller.v_ref[:][:3] - self.target_base_angular[ - self.i + self.params.N_gait * self.params.mpc_wbc_ratio - ] = controller.v_ref[:][3:] + self.target_base_angular[self.i + self.params.N_gait * self.params.mpc_wbc_ratio] = controller.v_ref[:][3:] if not self.params.asynchronous_mpc and not self.params.mpc_in_rosnode: self.t_ocp_update[self.i] = controller.mpc.ocp.t_update @@ -214,9 +202,7 @@ def plot_states(self, save=False, filename=TEMP_DIRNAME): legend = ["Hip", "Shoulder", "Knee"] figsize = (18, 6) - fig: plt.Figure = plt.figure( - figsize=figsize, dpi=FIG_DPI, constrained_layout=True - ) + fig: plt.Figure = plt.figure(figsize=figsize, dpi=FIG_DPI, constrained_layout=True) gridspec = fig.add_gridspec(1, 2) gs0 = gridspec[0].subgridspec(2, 2) gs1 = gridspec[1].subgridspec(2, 2) @@ -259,10 +245,7 @@ def plot_torques(self, save=False, filename=TEMP_DIRNAME): for i in range(4): plt.subplot(2, 2, i + 1) plt.title("Joint torques of " + str(i)) - [ - plt.plot(np.array(self.torquesFromCurrentMeasurment)[:, (3 * i + jj)]) - for jj in range(3) - ] + [plt.plot(np.array(self.torquesFromCurrentMeasurment)[:, (3 * i + jj)]) for jj in range(3)] plt.ylabel("Torque [Nm]") plt.xlabel("$t$ [s]") plt.legend(legend) @@ -273,23 +256,15 @@ def plot_torques(self, save=False, filename=TEMP_DIRNAME): def plot_target(self, save=False, filename=TEMP_DIRNAME): import matplotlib.pyplot as plt - t_range = np.array( - [k * self.params.dt_wbc for k in range(self.tstamps.shape[0])] - ) + t_range = np.array([k * self.params.dt_wbc for k in range(self.tstamps.shape[0])]) x = np.concatenate([self.q_filtered, self.v_filtered], axis=1) - m_feet_p_log = { - idx: get_translation_array(self.pd.model, x, idx)[0] - for idx in self.pd.feet_ids - } + m_feet_p_log = {idx: get_translation_array(self.pd.model, x, idx)[0] for idx in self.pd.feet_ids} x_mpc = [self.ocp_xs[0][0, :]] [x_mpc.append(x[1, :]) for x in self.ocp_xs[:-1]] x_mpc = np.array(x_mpc) - feet_p_log = { - idx: get_translation_array(self.pd.model, x_mpc, idx)[0] - for idx in self.pd.feet_ids - } + feet_p_log = {idx: get_translation_array(self.pd.model, x_mpc, idx)[0] for idx in self.pd.feet_ids} # Target plot _, axs = plt.subplots(3, 2, sharex=True) @@ -338,10 +313,7 @@ def plot_target(self, save=False, filename=TEMP_DIRNAME): legend = ["x", "y", "z"] for p in range(3): axs[p].set_title("Predicted free foot on z over " + legend[p]) - [ - axs[p].plot(t_range, feet_p_log[foot_id][:, p]) - for foot_id in self.pd.feet_ids - ] + [axs[p].plot(t_range, feet_p_log[foot_id][:, p]) for foot_id in self.pd.feet_ids] axs[p].legend(self.pd.feet_names) if save: @@ -376,18 +348,14 @@ def plot_riccati_gains(self, n, save=False, filename=TEMP_DIRNAME): def plot_controller_times(self, save=False, filename=TEMP_DIRNAME): import matplotlib.pyplot as plt - t_range = np.array( - [k * self.params.dt_mpc for k in range(self.tstamps.shape[0])] - ) + t_range = np.array([k * self.params.dt_mpc for k in range(self.tstamps.shape[0])]) alpha = 0.7 plt.figure(figsize=(9, 6), dpi=FIG_DPI) plt.plot(t_range, self.t_measures, "r+", alpha=alpha, label="Estimation") plt.plot(t_range, self.t_mpc, "g+", alpha=alpha, label="MPC (total)") # plt.plot(t_range, self.t_send, c="pink", marker="+", alpha=alpha, label="Sending command") - plt.plot( - t_range, self.t_loop, "+", c="violet", alpha=alpha, label="Entire loop" - ) + plt.plot(t_range, self.t_loop, "+", c="violet", alpha=alpha, label="Entire loop") plt.plot( t_range, self.t_ocp_ddp, @@ -416,9 +384,7 @@ def plot_controller_times(self, save=False, filename=TEMP_DIRNAME): def plot_ocp_times(self): import matplotlib.pyplot as plt - t_range = np.array( - [k * self.params.dt_mpc for k in range(self.tstamps.shape[0])] - ) + t_range = np.array([k * self.params.dt_mpc for k in range(self.tstamps.shape[0])]) plt.figure() plt.plot(t_range, self.t_ocp_update, "r+") diff --git a/python/quadruped_reactive_walking/tools/meshcat_viewer.py b/python/quadruped_reactive_walking/tools/meshcat_viewer.py index ac30045b..cc783159 100644 --- a/python/quadruped_reactive_walking/tools/meshcat_viewer.py +++ b/python/quadruped_reactive_walking/tools/meshcat_viewer.py @@ -5,9 +5,7 @@ from pinocchio.visualize import MeshcatVisualizer -def make_meshcat_viz( - robot: RobotWrapper, meshColor=(0.6, 0.1, 0.1, 0.8) -) -> MeshcatVisualizer: +def make_meshcat_viz(robot: RobotWrapper, meshColor=(0.6, 0.1, 0.1, 0.8)) -> MeshcatVisualizer: import hppfcl plane = hppfcl.Plane(np.array([0, 0, 1]), 0.0) @@ -17,9 +15,7 @@ def make_meshcat_viz( vmodel = robot.visual_model vmodel.addGeometryObject(geobj) - vizer = MeshcatVisualizer( - robot.model, robot.collision_model, vmodel, data=robot.data - ) + vizer = MeshcatVisualizer(robot.model, robot.collision_model, vmodel, data=robot.data) vizer.initViewer(loadModel=True) vizer.setBackgroundColor() return vizer diff --git a/python/quadruped_reactive_walking/tools/plotting.py b/python/quadruped_reactive_walking/tools/plotting.py index 6f3bf1a1..f6674db0 100644 --- a/python/quadruped_reactive_walking/tools/plotting.py +++ b/python/quadruped_reactive_walking/tools/plotting.py @@ -21,24 +21,15 @@ def plot_mpc(task, mpc_result: MPCResult, base=False, joints=True): legend = ["Hip", "Shoulder", "Knee"] _, axs = plt.subplots(3, 4, sharex=True) for foot in range(4): - [ - axs[0, foot].plot(np.array(mpc_result.xs)[:, 7 + 3 * foot + joint]) - for joint in range(3) - ] + [axs[0, foot].plot(np.array(mpc_result.xs)[:, 7 + 3 * foot + joint]) for joint in range(3)] axs[0, foot].legend(legend) axs[0, foot].set_title("Joint positions for " + task.feet_names[foot]) - [ - axs[1, foot].plot(np.array(mpc_result.xs)[:, 19 + 6 + 3 * foot + joint]) - for joint in range(3) - ] + [axs[1, foot].plot(np.array(mpc_result.xs)[:, 19 + 6 + 3 * foot + joint]) for joint in range(3)] axs[1, foot].legend(legend) axs[1, foot].set_title("Joint velocity for " + task.feet_names[foot]) - [ - axs[2, foot].plot(np.array(mpc_result.us)[:, 3 * foot + joint]) - for joint in range(3) - ] + [axs[2, foot].plot(np.array(mpc_result.us)[:, 3 * foot + joint]) for joint in range(3)] axs[2, foot].legend(legend) axs[2, foot].set_title("Joint torques for foot " + task.feet_names[foot]) diff --git a/python/quadruped_reactive_walking/tools/pybullet_sim.py b/python/quadruped_reactive_walking/tools/pybullet_sim.py index e22b9540..c99898ac 100644 --- a/python/quadruped_reactive_walking/tools/pybullet_sim.py +++ b/python/quadruped_reactive_walking/tools/pybullet_sim.py @@ -32,9 +32,7 @@ class PybulletWrapper: def __init__(self, q_init, env_id, use_flat_plane, enable_pyb_GUI, dt=0.001): self.applied_force = np.zeros(3) self.enable_gui = enable_pyb_GUI - GUI_OPTIONS = "--width={} --height={}".format( - VIDEO_CONFIG["width"], VIDEO_CONFIG["height"] - ) + GUI_OPTIONS = "--width={} --height={}".format(VIDEO_CONFIG["width"], VIDEO_CONFIG["height"]) # Start the client for PyBullet if self.enable_gui: @@ -237,9 +235,7 @@ def __init__(self, q_init, env_id, use_flat_plane, enable_pyb_GUI, dt=0.001): # Load Quadruped robot robotStartPos = [0, 0, 0] robotStartOrientation = pyb.getQuaternionFromEuler([0.0, 0.0, 0.0]) - pyb.setAdditionalSearchPath( - EXAMPLE_ROBOT_DATA_MODEL_DIR + "/solo_description/robots" - ) + pyb.setAdditionalSearchPath(EXAMPLE_ROBOT_DATA_MODEL_DIR + "/solo_description/robots") self.robotId = pyb.loadURDF("solo12.urdf", robotStartPos, robotStartOrientation) # Disable default motor control for revolute joints @@ -254,9 +250,7 @@ def __init__(self, q_init, env_id, use_flat_plane, enable_pyb_GUI, dt=0.001): # Initialize the robot in a specific configuration self.q_init = np.array([q_init]).transpose() - pyb.resetJointStatesMultiDof( - self.robotId, self.revoluteJointIndices, self.q_init - ) + pyb.resetJointStatesMultiDof(self.robotId, self.revoluteJointIndices, self.q_init) # Enable torque control for revolute joints jointTorques = [0.0 for m in self.revoluteJointIndices] @@ -270,9 +264,7 @@ def __init__(self, q_init, env_id, use_flat_plane, enable_pyb_GUI, dt=0.001): # adjuste the robot Z position to be barely in contact with the ground z_offset = 0 while True: - closest_points = pyb.getClosestPoints( - self.robotId, self.planeId, distance=0.1 - ) + closest_points = pyb.getClosestPoints(self.robotId, self.planeId, distance=0.1) lowest_dist = 1e10 for pair in closest_points: robot_point = pair[5] @@ -381,9 +373,7 @@ def check_pyb_env(self, k, env_id, q): useMaximalCoordinates=True, ) pyb.changeDynamics(tmpId, -1, lateralFriction=1.0) - pyb.resetBasePositionAndOrientation( - self.robotId, [0, 0, 0.25], [0, 0, 0, 1] - ) + pyb.resetBasePositionAndOrientation(self.robotId, [0, 0, 0.25], [0, 0, 0, 1]) if self.enable_gui: self.set_debug_camera(q) @@ -742,9 +732,7 @@ def parse_sensor_data(self): """ # Position and velocity of actuators - jointStates = pyb.getJointStates( - self.pyb_sim.robotId, self.revoluteJointIndices - ) # State of all joints + jointStates = pyb.getJointStates(self.pyb_sim.robotId, self.revoluteJointIndices) # State of all joints self.joints.positions[:] = np.array([state[0] for state in jointStates]) self.joints.velocities[:] = np.array([state[1] for state in jointStates]) @@ -769,9 +757,7 @@ def parse_sensor_data(self): self.oMb = pin.SE3(self.rot_oMb, np.array([self.height]).transpose()) # Angular velocities of the base - self.imu.gyroscope[:] = ( - self.oMb.rotation.transpose() @ np.array([self.baseVel[1]]).transpose() - ).ravel() + self.imu.gyroscope[:] = (self.oMb.rotation.transpose() @ np.array([self.baseVel[1]]).transpose()).ravel() # Linear Acceleration of the base self.o_baseVel = np.array([self.baseVel[0]]).transpose() @@ -785,10 +771,7 @@ def parse_sensor_data(self): self.oMb.rotation.transpose() @ (self.o_imuVel - self.prev_o_imuVel) ).ravel() / self.dt self.prev_o_imuVel[:, 0:1] = self.o_imuVel - self.imu.accelerometer[:] = ( - self.imu.linear_acceleration - + (self.oMb.rotation.transpose() @ self.g).ravel() - ) + self.imu.accelerometer[:] = self.imu.linear_acceleration + (self.oMb.rotation.transpose() @ self.g).ravel() def send_command_and_wait_end_of_cycle(self, WaitEndOfCycle=True): """ @@ -803,9 +786,7 @@ def send_command_and_wait_end_of_cycle(self, WaitEndOfCycle=True): self.joints.velocities[:] = np.array([state[1] for state in joints]) # Compute PD torques - tau_pd = self.P * (self.q_des - self.joints.positions) + self.D * ( - self.v_des - self.joints.velocities - ) + tau_pd = self.P * (self.q_des - self.joints.positions) + self.D * (self.v_des - self.joints.velocities) # Save desired torques in a storage array self.jointTorques = tau_pd + self.tau_ff diff --git a/python/quadruped_reactive_walking/tools/qualisys_client.py b/python/quadruped_reactive_walking/tools/qualisys_client.py index f83066cc..fbba7c1f 100644 --- a/python/quadruped_reactive_walking/tools/qualisys_client.py +++ b/python/quadruped_reactive_walking/tools/qualisys_client.py @@ -123,9 +123,7 @@ def on_packet(packet): timestamp = packet.timestamp * 1e-6 # Get the last position and Rotation matrix from the shared memory. - last_position = np.array( - [shared_bodyPosition[0], shared_bodyPosition[1], shared_bodyPosition[2]] - ) + last_position = np.array([shared_bodyPosition[0], shared_bodyPosition[1], shared_bodyPosition[2]]) last_rotation = np.array( [ [ diff --git a/python/quadruped_reactive_walking/tools/ros_tools.py b/python/quadruped_reactive_walking/tools/ros_tools.py index c04d02b6..8b78bdba 100644 --- a/python/quadruped_reactive_walking/tools/ros_tools.py +++ b/python/quadruped_reactive_walking/tools/ros_tools.py @@ -7,9 +7,7 @@ def numpy_to_multiarray_float64(np_array): multiarray = Float64MultiArray() multiarray.layout.dim = [ - MultiArrayDimension( - "dim%d" % i, np_array.shape[i], np_array.shape[i] * np_array.dtype.itemsize - ) + MultiArrayDimension("dim%d" % i, np_array.shape[i], np_array.shape[i] * np_array.dtype.itemsize) for i in range(np_array.ndim) ] multiarray.data = np_array.ravel().tolist() @@ -57,15 +55,11 @@ def result_cb(fut): print('Waiting...') """ - def __init__( - self, service_name, service_type, persistent=True, headers=None, callback=None - ): + def __init__(self, service_name, service_type, persistent=True, headers=None, callback=None): """Create an asynchronous service proxy.""" self.executor = ThreadPoolExecutor(max_workers=1) - self.service_proxy = rospy.ServiceProxy( - service_name, service_type, persistent, headers - ) + self.service_proxy = rospy.ServiceProxy(service_name, service_type, persistent, headers) self.callback = callback def __call__(self, *args, **kwargs): diff --git a/python/quadruped_reactive_walking/wb_mpc/__init__.py b/python/quadruped_reactive_walking/wb_mpc/__init__.py index 5db4130b..d74a0466 100644 --- a/python/quadruped_reactive_walking/wb_mpc/__init__.py +++ b/python/quadruped_reactive_walking/wb_mpc/__init__.py @@ -10,9 +10,7 @@ except ImportError: import warnings - warnings.warn( - "ProxDDP is not installed. The corresponding solvers will not be available." - ) + warnings.warn("ProxDDP is not installed. The corresponding solvers will not be available.") def get_ocp_from_str(type_str): diff --git a/python/quadruped_reactive_walking/wb_mpc/ocp_crocoddyl.py b/python/quadruped_reactive_walking/wb_mpc/ocp_crocoddyl.py index f0603cb9..8c466838 100644 --- a/python/quadruped_reactive_walking/wb_mpc/ocp_crocoddyl.py +++ b/python/quadruped_reactive_walking/wb_mpc/ocp_crocoddyl.py @@ -92,9 +92,7 @@ def push_node(self, k, x0, footsteps, base_vel_ref: Optional[pin.Motion]): if k == 0: return - model, support_feet, base_vel_ref = self._builder.select_next_model( - k, self.current_gait, base_vel_ref - ) + model, support_feet, base_vel_ref = self._builder.select_next_model(k, self.current_gait, base_vel_ref) active_feet_pos = get_active_feet(footsteps, support_feet) self._builder.update_model(model, active_feet_pos, base_vel_ref, support_feet) self.circular_append(model) diff --git a/python/quadruped_reactive_walking/wb_mpc/ocp_proxddp.py b/python/quadruped_reactive_walking/wb_mpc/ocp_proxddp.py index de869439..6b7426b1 100644 --- a/python/quadruped_reactive_walking/wb_mpc/ocp_proxddp.py +++ b/python/quadruped_reactive_walking/wb_mpc/ocp_proxddp.py @@ -37,9 +37,7 @@ def __init__( ): super().__init__(params, footsteps, base_refs) - self.algtr_problem: aligator.TrajOptProblem = ( - aligator.croc.convertCrocoddylProblem(self.croc_problem) - ) + self.algtr_problem: aligator.TrajOptProblem = aligator.croc.convertCrocoddylProblem(self.croc_problem) self.num_threads = params.ocp.num_threads if hasattr(self.croc_problem, "num_threads"): diff --git a/python/quadruped_reactive_walking/wb_mpc/target.py b/python/quadruped_reactive_walking/wb_mpc/target.py index 94580112..5de4e598 100644 --- a/python/quadruped_reactive_walking/wb_mpc/target.py +++ b/python/quadruped_reactive_walking/wb_mpc/target.py @@ -25,9 +25,7 @@ def __init__(self, params): self.velocity_lin_target = np.array([0.5, 0, 0]) self.velocity_ang_target = np.array([0, 0, 0]) # dim 6 - self.base_vel_ref = pin.Motion( - self.velocity_lin_target, self.velocity_ang_target - ) + self.base_vel_ref = pin.Motion(self.velocity_lin_target, self.velocity_ang_target) elif params.movement == "circle": self.A = np.array([0.05, 0.0, 0.04]) self.offset = np.array([0.05, 0, 0.05]) @@ -70,30 +68,14 @@ def compute(self, k): out[:, 1] = self._evaluate_step(1, k) out[2, 1] += 0.015 else: - out[0, 1] = ( - self.target_ramp_x[k] - if k < self.ramp_length - else self.target_ramp_x[-1] - ) - out[1, 1] = ( - self.target_ramp_y[k] - if k < self.ramp_length - else self.target_ramp_y[-1] - ) - out[2, 1] = ( - self.target_ramp_z[k] - if k < self.ramp_length - else self.target_ramp_z[-1] - ) + out[0, 1] = self.target_ramp_x[k] if k < self.ramp_length else self.target_ramp_x[-1] + out[1, 1] = self.target_ramp_y[k] if k < self.ramp_length else self.target_ramp_y[-1] + out[2, 1] = self.target_ramp_z[k] if k < self.ramp_length else self.target_ramp_z[-1] return out def _evaluate_circle(self, k, initial_position): - return ( - initial_position - + self.offset - + self.A * np.sin(2 * np.pi * self.freq * k * self.dt_wbc + self.phase) - ) + return initial_position + self.offset + self.A * np.sin(2 * np.pi * self.freq * k * self.dt_wbc + self.phase) def _evaluate_step(self, j, k): n_step = k // self.k_per_step diff --git a/python/quadruped_reactive_walking/wb_mpc/task_spec.py b/python/quadruped_reactive_walking/wb_mpc/task_spec.py index cc130009..88ceb575 100644 --- a/python/quadruped_reactive_walking/wb_mpc/task_spec.py +++ b/python/quadruped_reactive_walking/wb_mpc/task_spec.py @@ -15,9 +15,7 @@ def __init__(self, params: Params, frozen_names=[]): self.collision_model = self.robot.collision_model self.visual_model = self.robot.visual_model - self.robot_weight = ( - -sum([Y.mass for Y in self.model.inertias]) * self.model.gravity.linear[2] - ) + self.robot_weight = -sum([Y.mass for Y in self.model.inertias]) * self.model.gravity.linear[2] self.frozen_names = frozen_names self.frozen_idxs = [self.model.getJointId(id) for id in frozen_names] @@ -68,9 +66,7 @@ def __init__(self, params: Params): self.useFixedBase = 0 self.base_id = self.model.getFrameId("base_link") - self.state_limit = np.concatenate( - [np.full(18, np.inf), np.zeros(6), np.ones(12) * 800] - ) + self.state_limit = np.concatenate([np.full(18, np.inf), np.zeros(6), np.ones(12) * 800]) task_pms = params.task["walk"] @@ -90,9 +86,7 @@ def __init__(self, params: Params): self.control_bound_w = task_pms["control_bound_w"] self.control_reg_w = task_pms["control_reg_w"] - self.state_reg_w = np.array( - [0] * 3 + [0] * 3 + [1e2 * 3] * 12 + [0] * 6 + [1e1 * 2] * 12 - ) + self.state_reg_w = np.array([0] * 3 + [0] * 3 + [1e2 * 3] * 12 + [0] * 6 + [1e1 * 2] * 12) self.state_bound_w = np.array([0] * 18 + [0] * 6 + [0] * 12) self.terminal_velocity_w = np.zeros(2 * self.nv) self.terminal_velocity_w[self.nv :] = task_pms["terminal_velocity_w"] @@ -116,9 +110,7 @@ def __init__(self, params: Params): self.friction_cone_w = 1e3 self.control_bound_w = 1e3 self.control_reg_w = 1e0 - self.state_reg_w = np.array( - [1e2] * 3 + [1e-2] * 3 + [1e2] * 6 + [1e1] * 3 + [1e0] * 3 + [1e1] * 6 - ) + self.state_reg_w = np.array([1e2] * 3 + [1e-2] * 3 + [1e2] * 6 + [1e1] * 3 + [1e0] * 3 + [1e1] * 6) self.terminal_velocity_w = np.array([0] * 12 + [1e3] * 12) self.q0_reduced = self.q0[7:] diff --git a/python/quadruped_reactive_walking/wbmpc_wrapper_multiprocess.py b/python/quadruped_reactive_walking/wbmpc_wrapper_multiprocess.py index a06890f7..b8d3ae86 100644 --- a/python/quadruped_reactive_walking/wbmpc_wrapper_multiprocess.py +++ b/python/quadruped_reactive_walking/wbmpc_wrapper_multiprocess.py @@ -23,18 +23,14 @@ class MultiprocessMPCWrapper(MPCWrapperAbstract): Wrapper to run both types of MPC (OQSP or Crocoddyl) asynchronously in a new process """ - def __init__( - self, params: Params, footsteps, base_refs, solver_cls: Type[OCPAbstract] - ): + def __init__(self, params: Params, footsteps, base_refs, solver_cls: Type[OCPAbstract]): super().__init__(params) self.solver_cls = solver_cls self.footsteps_plan = footsteps self.base_refs = base_refs - self.last_available_result: MPCResult = MPCResult( - self.N_gait, self.nx, self.nu, self.ndx, self.WINDOW_SIZE - ) + self.last_available_result: MPCResult = MPCResult(self.N_gait, self.nx, self.nu, self.ndx, self.WINDOW_SIZE) # Shared memory used for multiprocessing self.smm = SharedMemoryManager() @@ -55,9 +51,7 @@ def __init__( self.gait_shared = self.create_shared_ndarray((self.N_gait + 1, 4), np.int32) self.xs_shared = self.create_shared_ndarray((self.WINDOW_SIZE + 1, self.nx)) self.us_shared = self.create_shared_ndarray((self.WINDOW_SIZE, self.nu)) - self.Ks_shared = self.create_shared_ndarray( - (self.WINDOW_SIZE, self.nu, self.ndx) - ) + self.Ks_shared = self.create_shared_ndarray((self.WINDOW_SIZE, self.nu, self.ndx)) self.footstep_shared = self.create_shared_ndarray((3, 4)) self.base_ref_shared = self.create_shared_ndarray(6) @@ -119,9 +113,7 @@ def _mpc_asynchronous(self): k, x0[:], footstep[:], base_ref[:] = self._get_shared_data_in() if k == 0: - loop_ocp = self.solver_cls( - self.params, self.footsteps_plan, self.base_refs - ) + loop_ocp = self.solver_cls(self.params, self.footsteps_plan, self.base_refs) loop_ocp.push_node(k, x0, footstep, base_ref) loop_ocp.solve(k) diff --git a/python/quadruped_reactive_walking/wbmpc_wrapper_ros.py b/python/quadruped_reactive_walking/wbmpc_wrapper_ros.py index 562cb999..7dcbe4a0 100644 --- a/python/quadruped_reactive_walking/wbmpc_wrapper_ros.py +++ b/python/quadruped_reactive_walking/wbmpc_wrapper_ros.py @@ -65,9 +65,7 @@ def __init__( self.solve_solver_srv = None if self.synchronous: - self.solve_solver_srv = rospy.ServiceProxy( - "qrw_wbmpc/solve", MPCSolve, persistent=True - ) + self.solve_solver_srv = rospy.ServiceProxy("qrw_wbmpc/solve", MPCSolve, persistent=True) else: self.solve_solver_srv = AsyncServiceProxy( "qrw_wbmpc/solve", MPCSolve, callback=self._result_cb, persistent=True @@ -90,9 +88,7 @@ def _result_cb(self, fut): def _parse_result(self, msg): assert msg.run_success, "Error while runnning solver on server" self.new_result = True - self.last_available_result.gait = multiarray_to_numpy_float64(msg.gait).astype( - np.int32 - ) + self.last_available_result.gait = multiarray_to_numpy_float64(msg.gait).astype(np.int32) self.last_available_result.xs = multiarray_to_listof_numpy_float64(msg.xs) self.last_available_result.us = multiarray_to_listof_numpy_float64(msg.us) self.last_available_result.K = multiarray_to_listof_numpy_float64(msg.K) @@ -112,24 +108,16 @@ def get_latest_result(self): def stop_parallel_loop(self): stop_solver_srv = rospy.ServiceProxy("qrw_wbmpc/stop", MPCStop) res = stop_solver_srv() - assert ( - res.success - ), "Unable to stop the MPC server. (Most probably stopped already)" + assert res.success, "Unable to stop the MPC server. (Most probably stopped already)" print("Stopped MPC server.") class ROSMPCWrapperServer: def __init__(self): self.is_init = False - self._init_service = rospy.Service( - "qrw_wbmpc/init", MPCInit, self._trigger_init - ) - self._solve_service = rospy.Service( - "qrw_wbmpc/solve", MPCSolve, self._trigger_solve - ) - self._stop_service = rospy.Service( - "qrw_wbmpc/stop", MPCStop, self._trigger_stop - ) + self._init_service = rospy.Service("qrw_wbmpc/init", MPCInit, self._trigger_init) + self._solve_service = rospy.Service("qrw_wbmpc/solve", MPCSolve, self._trigger_solve) + self._stop_service = rospy.Service("qrw_wbmpc/stop", MPCStop, self._trigger_stop) rospy.loginfo("Initializing MPC server.") def _trigger_init(self, msg): @@ -147,9 +135,7 @@ def _trigger_init(self, msg): self.solver_cls = get_ocp_from_str(msg.solver_type) footsteps = multiarray_to_numpy_float64(msg.footsteps) - base_refs = [ - pin.Motion(v_ref) for v_ref in multiarray_to_numpy_float64(msg.base_refs) - ] + base_refs = [pin.Motion(v_ref) for v_ref in multiarray_to_numpy_float64(msg.base_refs)] self.ocp = self.solver_cls(self.params, footsteps, base_refs) diff --git a/python/quadruped_reactive_walking/wbmpc_wrapper_ros_mp.py b/python/quadruped_reactive_walking/wbmpc_wrapper_ros_mp.py index f99fc9ae..cebf2afd 100644 --- a/python/quadruped_reactive_walking/wbmpc_wrapper_ros_mp.py +++ b/python/quadruped_reactive_walking/wbmpc_wrapper_ros_mp.py @@ -15,12 +15,8 @@ class ROSMPCAsyncClient(MultiprocessMPCWrapper): Wrapper to run both types of MPC (OQSP or Crocoddyl) asynchronously in a new process """ - def __init__( - self, params: Params, footsteps, base_refs, solver_cls: Type[OCPAbstract] - ): - self.ros_client = ROSMPCWrapperClient( - params, footsteps, base_refs, solver_cls, synchronous=True - ) + def __init__(self, params: Params, footsteps, base_refs, solver_cls: Type[OCPAbstract]): + self.ros_client = ROSMPCWrapperClient(params, footsteps, base_refs, solver_cls, synchronous=True) super().__init__(params, footsteps, base_refs, solver_cls) @@ -39,9 +35,7 @@ def _mpc_asynchronous(self): self.ros_client.solve(k, x0, footstep, base_ref) res: MPCResult = self.ros_client.get_latest_result() - self._put_shared_data_out( - res.gait, res.xs, res.us, res.K, res.num_iters, res.solving_duration - ) + self._put_shared_data_out(res.gait, res.xs, res.us, res.K, res.num_iters, res.solving_duration) self.new_result.value = True def stop_parallel_loop(self): diff --git a/python/quadruped_reactive_walking/wbmpc_wrapper_sync.py b/python/quadruped_reactive_walking/wbmpc_wrapper_sync.py index b8c59001..9aa74dcb 100644 --- a/python/quadruped_reactive_walking/wbmpc_wrapper_sync.py +++ b/python/quadruped_reactive_walking/wbmpc_wrapper_sync.py @@ -11,9 +11,7 @@ class SyncMPCWrapper(MPCWrapperAbstract): Wrapper to run both types of MPC (OQSP or Crocoddyl) in a synchronous manner in the main thread. """ - def __init__( - self, params: Params, footsteps, base_refs, solver_cls: Type[OCPAbstract] - ): + def __init__(self, params: Params, footsteps, base_refs, solver_cls: Type[OCPAbstract]): super().__init__(params) self.solver_cls = solver_cls diff --git a/scripts/Joystick.py b/scripts/Joystick.py index 9a33585e..eedc4ceb 100644 --- a/scripts/Joystick.py +++ b/scripts/Joystick.py @@ -104,12 +104,8 @@ def update_v_ref_gamepad(self, k_loop, is_static): self.vYaw = self.gp.rightJoystickX.value * self.vYawScale self.vZ = self.gp.rightJoystickY.value * self.vZScale - if ( - is_static and self.gp.L1Button.value - ): # If static the orientation of the base is controlled - self.v_gp = np.array( - [[0.0, 0.0, -self.vZ * 0.5, -self.vX * 5, -self.vY * 2, -self.vYaw]] - ).T + if is_static and self.gp.L1Button.value: # If static the orientation of the base is controlled + self.v_gp = np.array([[0.0, 0.0, -self.vZ * 0.5, -self.vX * 5, -self.vY * 2, -self.vYaw]]).T # print(self.v_gp.ravel()) else: # Otherwise the Vx, Vy, Vyaw is controlled self.v_gp = np.array([[-self.vY, -self.vX, 0.0, 0.0, 0.0, -self.vYaw]]).T @@ -176,9 +172,7 @@ def handle_v_switch(self, k): k (int): numero of the current iteration """ - self.v_ref[:, 0] = self.joyCpp.handle_v_switch( - k, self.k_switch.reshape((-1, 1)), self.v_switch - ) + self.v_ref[:, 0] = self.joyCpp.handle_v_switch(k, self.k_switch.reshape((-1, 1)), self.v_switch) def update_v_ref_predefined(self, k_loop, velID): """Update the reference velocity of the robot along X, Y and Yaw in local frame @@ -459,9 +453,7 @@ def update_v_ref_multi_simu(self, k_loop): def update_for_analysis(self, des_vel_analysis, N_analysis, N_steady): self.analysis = True - self.k_switch = np.array( - [0, int(1 / self.dt_wbc), N_analysis, N_analysis + N_steady] - ) + self.k_switch = np.array([0, int(1 / self.dt_wbc), N_analysis, N_analysis + N_steady]) self.v_switch = np.zeros((6, 4)) self.v_switch[:, 2] = des_vel_analysis self.v_switch[:, 3] = des_vel_analysis @@ -472,9 +464,7 @@ def update_for_analysis(self, des_vel_analysis, N_analysis, N_steady): if __name__ == "__main__": from matplotlib import pyplot as plt - params = ( - lqrw.Params.create_from_file() - ) # Object that holds all controller parameters + params = lqrw.Params.create_from_file() # Object that holds all controller parameters params.predefined_vel = False joystick = Joystick(params) k = 0 diff --git a/test_client.py b/test_client.py index 4a6568ad..46fad5c1 100644 --- a/test_client.py +++ b/test_client.py @@ -19,9 +19,7 @@ class TestClient: def __init__( self, ): - self.solve_solver_srv = rospy.ServiceProxy( - "qrw_wbmpc/test", MPCSolve, persistent=True - ) + self.solve_solver_srv = rospy.ServiceProxy("qrw_wbmpc/test", MPCSolve, persistent=True) def solve(self): return self.solve_solver_srv(MPCSolveRequest()) @@ -29,9 +27,7 @@ def solve(self): class TestServer: def __init__(self): - self._solve_service = rospy.Service( - "qrw_wbmpc/test", MPCSolve, self._trigger_solve - ) + self._solve_service = rospy.Service("qrw_wbmpc/test", MPCSolve, self._trigger_solve) from quadruped_reactive_walking import MPCResult, Params p = Params.create_from_file()