From fac30f1d71a3b9a49bc6c311a3c8bb778170fc4b Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 17 Jun 2024 18:52:19 -0400 Subject: [PATCH 1/4] feat: expose c1 migration cmds --- keramik/src/ipfs.md | 37 ++++++++ operator/src/network/cas.rs | 1 + operator/src/network/ceramic.rs | 61 +++++++------- operator/src/network/controller.rs | 130 +++++++++++++++++++++++++++++ operator/src/network/ipfs.rs | 33 +++++++- operator/src/network/spec.rs | 2 + 6 files changed, 233 insertions(+), 31 deletions(-) diff --git a/keramik/src/ipfs.md b/keramik/src/ipfs.md index 7a3db292..b70ab3a6 100644 --- a/keramik/src/ipfs.md +++ b/keramik/src/ipfs.md @@ -169,3 +169,40 @@ spec: commands: - ipfs config --json Swarm.RelayClient.Enabled false ``` + +## Migration from Kubo to Ceramic One + +A Kubo blockstore can be migrated to Ceramic One by specifying the migration command in the IPFS configuration. + +Example [network config](./setup_network.md) that uses Go based IPFS (i.e. Kubo) with its defaults for Ceramic (including a default +blockstore path of `/data/ipfs`) and the Ceramic network set to `dev-unstable`. + +```yaml +apiVersion: "keramik.3box.io/v1alpha1" +kind: Network +metadata: + name: kubo-migration-src +spec: + replicas: 5 + ceramic: + - ipfs: + go: {} + networkType: dev-unstable +``` + +Example [network config](./setup_network.md) that uses Ceramic One and specifies what migration command to run before +starting up the node. + +```yaml +apiVersion: "keramik.3box.io/v1alpha1" +kind: Network +metadata: + name: ceramic-one-migration-dest +spec: + replicas: 5 + ceramic: + - ipfs: + rust: + migrationCmd: + - from-ipfs -i /data/ipfs/blocks -o /data/ipfs/db.sqlite3 --network dev-unstable +``` diff --git a/operator/src/network/cas.rs b/operator/src/network/cas.rs index c5e40cbe..746f57a1 100644 --- a/operator/src/network/cas.rs +++ b/operator/src/network/cas.rs @@ -595,6 +595,7 @@ pub fn cas_ipfs_stateful_set_spec( ..Default::default() }), spec: Some(PodSpec { + init_containers: config.ipfs.init_container(net_config).map(|c| vec![c]), containers: vec![config.ipfs.container(ipfs_info, net_config)], volumes: Some(volumes), ..Default::default() diff --git a/operator/src/network/ceramic.rs b/operator/src/network/ceramic.rs index 8f8487e8..43b9390e 100644 --- a/operator/src/network/ceramic.rs +++ b/operator/src/network/ceramic.rs @@ -664,35 +664,40 @@ pub fn stateful_set_spec(ns: &str, bundle: &CeramicBundle<'_>) -> StatefulSetSpe .ipfs .container(&bundle.info, bundle.net_config), ], - init_containers: Some(vec![Container { - command: Some(vec![ - "/bin/bash".to_owned(), - "-c".to_owned(), - "/ceramic-init/ceramic-init.sh".to_owned(), - ]), - env: Some(init_env), - image: Some(bundle.config.init_image_name.to_owned()), - image_pull_policy: Some(bundle.config.image_pull_policy.to_owned()), - name: "init-ceramic-config".to_owned(), - resources: Some(ResourceRequirements { - limits: Some(bundle.config.resource_limits.clone().into()), - requests: Some(bundle.config.resource_limits.clone().into()), - ..Default::default() - }), - volume_mounts: Some(vec![ - VolumeMount { - mount_path: "/config".to_owned(), - name: "config-volume".to_owned(), - ..Default::default() - }, - VolumeMount { - mount_path: "/ceramic-init".to_owned(), - name: "ceramic-init".to_owned(), + init_containers: Some( + vec![Container { + command: Some(vec![ + "/bin/bash".to_owned(), + "-c".to_owned(), + "/ceramic-init/ceramic-init.sh".to_owned(), + ]), + env: Some(init_env), + image: Some(bundle.config.init_image_name.to_owned()), + image_pull_policy: Some(bundle.config.image_pull_policy.to_owned()), + name: "init-ceramic-config".to_owned(), + resources: Some(ResourceRequirements { + limits: Some(bundle.config.resource_limits.clone().into()), + requests: Some(bundle.config.resource_limits.clone().into()), ..Default::default() - }, - ]), - ..Default::default() - }]), + }), + volume_mounts: Some(vec![ + VolumeMount { + mount_path: "/config".to_owned(), + name: "config-volume".to_owned(), + ..Default::default() + }, + VolumeMount { + mount_path: "/ceramic-init".to_owned(), + name: "ceramic-init".to_owned(), + ..Default::default() + }, + ]), + ..Default::default() + }] + .into_iter() + .chain(bundle.config.ipfs.init_container(bundle.net_config)) + .collect(), + ), volumes: Some(volumes), security_context: Some(PodSecurityContext { fs_group: Some(70), diff --git a/operator/src/network/controller.rs b/operator/src/network/controller.rs index 84eda162..fe4604d5 100644 --- a/operator/src/network/controller.rs +++ b/operator/src/network/controller.rs @@ -4700,4 +4700,134 @@ mod tests { .expect("reconciler"); timeout_after_1s(mocksrv).await; } + #[tokio::test] + #[traced_test] + async fn migration_cmd() { + // Setup network spec and status + let network = Network::test().with_spec(NetworkSpec { + ceramic: Some(vec![CeramicSpec { + ipfs: Some(IpfsSpec::Rust(RustIpfsSpec { + migration_cmd: Some( + "from-ipfs -i /data/ipfs/blocks -o /data/ipfs/db.sqlite3 --network dev-unstable".to_string(), + ), + ..Default::default() + })), + ..Default::default() + }]), + ..Default::default() + }); + let mock_rpc_client = default_ipfs_rpc_mock(); + let mut stub = Stub::default().with_network(network.clone()); + stub.ceramics[0].stateful_set.patch(expect![[r#" + --- original + +++ modified + @@ -397,6 +397,95 @@ + "name": "ceramic-init" + } + ] + + }, + + { + + "command": [ + + "/usr/bin/ceramic-one", + + "migrations", + + "from-ipfs", + + "-i", + + "/data/ipfs/blocks", + + "-o", + + "/data/ipfs/db.sqlite3", + + "--network", + + "dev-unstable" + + ], + + "env": [ + + { + + "name": "CERAMIC_ONE_BIND_ADDRESS", + + "value": "0.0.0.0:5001" + + }, + + { + + "name": "CERAMIC_ONE_KADEMLIA_PARALLELISM", + + "value": "1" + + }, + + { + + "name": "CERAMIC_ONE_KADEMLIA_REPLICATION", + + "value": "6" + + }, + + { + + "name": "CERAMIC_ONE_LOCAL_NETWORK_ID", + + "value": "0" + + }, + + { + + "name": "CERAMIC_ONE_METRICS_BIND_ADDRESS", + + "value": "0.0.0.0:9465" + + }, + + { + + "name": "CERAMIC_ONE_NETWORK", + + "value": "local" + + }, + + { + + "name": "CERAMIC_ONE_STORE_DIR", + + "value": "/data/ipfs" + + }, + + { + + "name": "CERAMIC_ONE_SWARM_ADDRESSES", + + "value": "/ip4/0.0.0.0/tcp/4001" + + }, + + { + + "name": "RUST_LOG", + + "value": "info,ceramic_one=debug,multipart=error" + + } + + ], + + "image": "public.ecr.aws/r5b3e0r5/3box/ceramic-one:latest", + + "imagePullPolicy": "Always", + + "name": "ipfs", + + "ports": [ + + { + + "containerPort": 4001, + + "name": "swarm-tcp", + + "protocol": "TCP" + + }, + + { + + "containerPort": 5001, + + "name": "rpc", + + "protocol": "TCP" + + }, + + { + + "containerPort": 9465, + + "name": "metrics", + + "protocol": "TCP" + + } + + ], + + "resources": { + + "limits": { + + "cpu": "1", + + "ephemeral-storage": "1Gi", + + "memory": "1Gi" + + }, + + "requests": { + + "cpu": "1", + + "ephemeral-storage": "1Gi", + + "memory": "1Gi" + + } + + }, + + "volumeMounts": [ + + { + + "mountPath": "/data/ipfs", + + "name": "ipfs-data" + + } + + ] + } + ], + "securityContext": { + "#]]); + stub.cas_ipfs_stateful_set.patch(expect![[r#" + --- original + +++ modified + "#]]); + let (testctx, api_handle) = Context::test(mock_rpc_client); + let fakeserver = ApiServerVerifier::new(api_handle); + let mocksrv = stub.run(fakeserver); + reconcile(Arc::new(network), testctx) + .await + .expect("reconciler"); + timeout_after_1s(mocksrv).await; + } } diff --git a/operator/src/network/ipfs.rs b/operator/src/network/ipfs.rs index a3562a95..38473a60 100644 --- a/operator/src/network/ipfs.rs +++ b/operator/src/network/ipfs.rs @@ -9,6 +9,7 @@ use k8s_openapi::{ }; const IPFS_CONTAINER_NAME: &str = "ipfs"; +const IPFS_STORE_DIR: &str = "/data/ipfs"; pub const IPFS_DATA_PV_CLAIM: &str = "ipfs-data"; const IPFS_SERVICE_PORT: i32 = 5001; @@ -69,6 +70,12 @@ impl IpfsConfig { IpfsConfig::Go(config) => config.config_maps(&info), } } + pub fn init_container(&self, net_config: &NetworkConfig) -> Option { + match self { + IpfsConfig::Rust(config) => config.init_container(net_config), + _ => None, + } + } pub fn container(&self, info: impl Into, net_config: &NetworkConfig) -> Container { let info = info.into(); match self { @@ -98,6 +105,7 @@ pub struct RustIpfsConfig { storage: PersistentStorageConfig, rust_log: String, env: Option>, + migration_cmd: Option, } impl RustIpfsConfig { @@ -129,6 +137,7 @@ impl Default for RustIpfsConfig { }, rust_log: "info,ceramic_one=debug,multipart=error".to_owned(), env: None, + migration_cmd: None, } } } @@ -149,6 +158,7 @@ impl From for RustIpfsConfig { storage: PersistentStorageConfig::from_spec(value.storage, default.storage), rust_log: value.rust_log.unwrap_or(default.rust_log), env: value.env, + migration_cmd: value.migration_cmd, } } } @@ -177,7 +187,7 @@ impl RustIpfsConfig { }, EnvVar { name: "CERAMIC_ONE_STORE_DIR".to_owned(), - value: Some("/data/ipfs".to_owned()), + value: Some(IPFS_STORE_DIR.to_owned()), ..Default::default() }, EnvVar { @@ -253,7 +263,7 @@ impl RustIpfsConfig { ..Default::default() }), volume_mounts: Some(vec![VolumeMount { - mount_path: "/data/ipfs".to_owned(), + mount_path: IPFS_STORE_DIR.to_owned(), name: IPFS_DATA_PV_CLAIM.to_owned(), ..Default::default() }]), @@ -261,6 +271,23 @@ impl RustIpfsConfig { ..Default::default() } } + + fn init_container(&self, net_config: &NetworkConfig) -> Option { + if let Some(cmd) = &self.migration_cmd { + Some(Container { + command: Some( + vec!["/usr/bin/ceramic-one", "migrations"] + .into_iter() + .chain(cmd.split_whitespace()) + .map(ToOwned::to_owned) + .collect(), + ), + ..self.container(net_config) + }) + } else { + None + } + } } pub struct GoIpfsConfig { @@ -364,7 +391,7 @@ ipfs config --json Swarm.ResourceMgr.MaxFileDescriptors 500000 fn container(&self, info: &IpfsInfo) -> Container { let mut volume_mounts = vec![ VolumeMount { - mount_path: "/data/ipfs".to_owned(), + mount_path: IPFS_STORE_DIR.to_owned(), name: IPFS_DATA_PV_CLAIM.to_owned(), ..Default::default() }, diff --git a/operator/src/network/spec.rs b/operator/src/network/spec.rs index 66b68852..6c56d67e 100644 --- a/operator/src/network/spec.rs +++ b/operator/src/network/spec.rs @@ -204,6 +204,8 @@ pub struct RustIpfsSpec { /// Extra env values to pass to the image. /// CAUTION: Any env vars specified in this set will override any predefined values. pub env: Option>, + /// Migration command that should run before a node comes up + pub migration_cmd: Option, } /// Describes how the Go IPFS node for a peer should behave. From 4586551eaf2c5e1820e31c40f1b0ff9a39bb3257 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 17 Jun 2024 19:14:08 -0400 Subject: [PATCH 2/4] chore: build fixes --- runner/src/scenario/ceramic/mod.rs | 6 +++--- runner/src/scenario/ceramic/model_instance.rs | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/runner/src/scenario/ceramic/mod.rs b/runner/src/scenario/ceramic/mod.rs index 0aab8cb8..08fdefb8 100644 --- a/runner/src/scenario/ceramic/mod.rs +++ b/runner/src/scenario/ceramic/mod.rs @@ -30,7 +30,7 @@ impl Credentials { let did = Document::new(&std::env::var("DID_KEY").expect("DID_KEY is required")); let private_key = std::env::var("DID_PRIVATE_KEY").expect("DID_PRIVATE_KEY is required"); let signer = JwkSigner::new(did.clone(), &private_key).await?; - Ok(Self { signer, did }) + Ok(Self { signer }) } pub async fn admin_from_env() -> Result { @@ -51,13 +51,13 @@ impl Credentials { let did = Self::generate_did_for_jwk(&key)?; let signer = JwkSigner::new(did.clone(), &private_key).await?; - Ok(Self { signer, did }) + Ok(Self { signer }) } pub async fn new_generate_did_key() -> Result { let (pk, did) = Self::generate_did_and_pk()?; let signer = JwkSigner::new(did.clone(), &pk).await?; - Ok(Self { signer, did }) + Ok(Self { signer }) } fn generate_did_for_jwk(key: &JWK) -> anyhow::Result { diff --git a/runner/src/scenario/ceramic/model_instance.rs b/runner/src/scenario/ceramic/model_instance.rs index 292a37c0..b90c02d9 100644 --- a/runner/src/scenario/ceramic/model_instance.rs +++ b/runner/src/scenario/ceramic/model_instance.rs @@ -258,7 +258,6 @@ impl CeramicModelInstanceTestUser { config, user_info: GooseUserInfo { lead_user, - global_leader, lead_worker: is_goose_lead_worker(), }, small_model_id, From 20285ab4d6282d4b298e1311c132fecdaa52baba Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Mon, 24 Jun 2024 12:39:52 -0600 Subject: [PATCH 3/4] fix: use list of args for migration_cmd --- keramik/src/ipfs.md | 12 ++++++--- operator/src/network/controller.rs | 4 +-- operator/src/network/ipfs.rs | 27 +++++++++---------- operator/src/network/spec.rs | 2 +- runner/src/scenario/ceramic/mod.rs | 6 ++--- runner/src/scenario/ceramic/model_instance.rs | 1 + 6 files changed, 28 insertions(+), 24 deletions(-) diff --git a/keramik/src/ipfs.md b/keramik/src/ipfs.md index b70ab3a6..f706a11b 100644 --- a/keramik/src/ipfs.md +++ b/keramik/src/ipfs.md @@ -181,7 +181,7 @@ blockstore path of `/data/ipfs`) and the Ceramic network set to `dev-unstable`. apiVersion: "keramik.3box.io/v1alpha1" kind: Network metadata: - name: kubo-migration-src + name: basic-network spec: replicas: 5 ceramic: @@ -197,12 +197,18 @@ starting up the node. apiVersion: "keramik.3box.io/v1alpha1" kind: Network metadata: - name: ceramic-one-migration-dest + name: basic-network spec: replicas: 5 ceramic: - ipfs: rust: migrationCmd: - - from-ipfs -i /data/ipfs/blocks -o /data/ipfs/db.sqlite3 --network dev-unstable + - from-ipfs + - -i + - /data/ipfs/blocks + - -o + - /data/ipfs/ + - --network + - dev-unstable ``` diff --git a/operator/src/network/controller.rs b/operator/src/network/controller.rs index fe4604d5..c0a06c7a 100644 --- a/operator/src/network/controller.rs +++ b/operator/src/network/controller.rs @@ -861,8 +861,8 @@ async fn update_peer_status( for ceramic in ceramics { for i in 0..ceramic.info.replicas { let pod_name = ceramic.info.pod_name(i); - let pod = pods.get_status(&pod_name).await?; - if !is_pod_ready(&pod) { + let pod = pods.get_status(&pod_name).await; + if pod.map(|pod| !is_pod_ready(&pod)).unwrap_or(true) { debug!(pod_name, "peer is not ready skipping"); continue; } diff --git a/operator/src/network/ipfs.rs b/operator/src/network/ipfs.rs index 38473a60..7c0c6228 100644 --- a/operator/src/network/ipfs.rs +++ b/operator/src/network/ipfs.rs @@ -105,7 +105,7 @@ pub struct RustIpfsConfig { storage: PersistentStorageConfig, rust_log: String, env: Option>, - migration_cmd: Option, + migration_cmd: Option>, } impl RustIpfsConfig { @@ -273,20 +273,17 @@ impl RustIpfsConfig { } fn init_container(&self, net_config: &NetworkConfig) -> Option { - if let Some(cmd) = &self.migration_cmd { - Some(Container { - command: Some( - vec!["/usr/bin/ceramic-one", "migrations"] - .into_iter() - .chain(cmd.split_whitespace()) - .map(ToOwned::to_owned) - .collect(), - ), - ..self.container(net_config) - }) - } else { - None - } + self.migration_cmd.as_ref().map(|cmd| Container { + name: "ipfs-migration".to_string(), + command: Some( + vec!["/usr/bin/ceramic-one", "migrations"] + .into_iter() + .chain(cmd.iter().map(String::as_str)) + .map(ToOwned::to_owned) + .collect(), + ), + ..self.container(net_config) + }) } } diff --git a/operator/src/network/spec.rs b/operator/src/network/spec.rs index 6c56d67e..f19a4e79 100644 --- a/operator/src/network/spec.rs +++ b/operator/src/network/spec.rs @@ -205,7 +205,7 @@ pub struct RustIpfsSpec { /// CAUTION: Any env vars specified in this set will override any predefined values. pub env: Option>, /// Migration command that should run before a node comes up - pub migration_cmd: Option, + pub migration_cmd: Option>, } /// Describes how the Go IPFS node for a peer should behave. diff --git a/runner/src/scenario/ceramic/mod.rs b/runner/src/scenario/ceramic/mod.rs index 08fdefb8..0aab8cb8 100644 --- a/runner/src/scenario/ceramic/mod.rs +++ b/runner/src/scenario/ceramic/mod.rs @@ -30,7 +30,7 @@ impl Credentials { let did = Document::new(&std::env::var("DID_KEY").expect("DID_KEY is required")); let private_key = std::env::var("DID_PRIVATE_KEY").expect("DID_PRIVATE_KEY is required"); let signer = JwkSigner::new(did.clone(), &private_key).await?; - Ok(Self { signer }) + Ok(Self { signer, did }) } pub async fn admin_from_env() -> Result { @@ -51,13 +51,13 @@ impl Credentials { let did = Self::generate_did_for_jwk(&key)?; let signer = JwkSigner::new(did.clone(), &private_key).await?; - Ok(Self { signer }) + Ok(Self { signer, did }) } pub async fn new_generate_did_key() -> Result { let (pk, did) = Self::generate_did_and_pk()?; let signer = JwkSigner::new(did.clone(), &pk).await?; - Ok(Self { signer }) + Ok(Self { signer, did }) } fn generate_did_for_jwk(key: &JWK) -> anyhow::Result { diff --git a/runner/src/scenario/ceramic/model_instance.rs b/runner/src/scenario/ceramic/model_instance.rs index b90c02d9..220d4fd3 100644 --- a/runner/src/scenario/ceramic/model_instance.rs +++ b/runner/src/scenario/ceramic/model_instance.rs @@ -259,6 +259,7 @@ impl CeramicModelInstanceTestUser { user_info: GooseUserInfo { lead_user, lead_worker: is_goose_lead_worker(), + global_leader, }, small_model_id, small_model_instance_ids, From 5c8adf80b27805054c29ab267cc174e10781ddc2 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 25 Jun 2024 09:29:26 -0600 Subject: [PATCH 4/4] fix: tests --- operator/src/network/controller.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/operator/src/network/controller.rs b/operator/src/network/controller.rs index c0a06c7a..87ce39fa 100644 --- a/operator/src/network/controller.rs +++ b/operator/src/network/controller.rs @@ -4708,7 +4708,18 @@ mod tests { ceramic: Some(vec![CeramicSpec { ipfs: Some(IpfsSpec::Rust(RustIpfsSpec { migration_cmd: Some( - "from-ipfs -i /data/ipfs/blocks -o /data/ipfs/db.sqlite3 --network dev-unstable".to_string(), + vec![ + "from-ipfs", + "-i", + "/data/ipfs/blocks", + "-o", + "/data/ipfs/", + "--network", + "dev-unstable", + ] + .into_iter() + .map(ToOwned::to_owned) + .collect(), ), ..Default::default() })), @@ -4734,7 +4745,7 @@ mod tests { + "-i", + "/data/ipfs/blocks", + "-o", - + "/data/ipfs/db.sqlite3", + + "/data/ipfs/", + "--network", + "dev-unstable" + ], @@ -4778,7 +4789,7 @@ mod tests { + ], + "image": "public.ecr.aws/r5b3e0r5/3box/ceramic-one:latest", + "imagePullPolicy": "Always", - + "name": "ipfs", + + "name": "ipfs-migration", + "ports": [ + { + "containerPort": 4001,