Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose c1 migration cmds #186

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions keramik/src/ipfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,46 @@ spec:
commands:
- ipfs config --json Swarm.RelayClient.Enabled false
```

## Migration from Kubo to Ceramic One

A Kubo blockstore can be migrated to Ceramic One by specifying the migration command in the IPFS configuration.

Example [network config](./setup_network.md) that uses Go based IPFS (i.e. Kubo) with its defaults for Ceramic (including a default
blockstore path of `/data/ipfs`) and the Ceramic network set to `dev-unstable`.

```yaml
apiVersion: "keramik.3box.io/v1alpha1"
kind: Network
metadata:
name: basic-network
spec:
replicas: 5
ceramic:
- ipfs:
go: {}
networkType: dev-unstable
```

Example [network config](./setup_network.md) that uses Ceramic One and specifies what migration command to run before
starting up the node.

```yaml
apiVersion: "keramik.3box.io/v1alpha1"
kind: Network
metadata:
name: basic-network
spec:
replicas: 5
ceramic:
- ipfs:
rust:
migrationCmd:
- from-ipfs
- -i
- /data/ipfs/blocks
- -o
- /data/ipfs/
- --network
- dev-unstable
```
1 change: 1 addition & 0 deletions operator/src/network/cas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ pub fn cas_ipfs_stateful_set_spec(
..Default::default()
}),
spec: Some(PodSpec {
init_containers: config.ipfs.init_container(net_config).map(|c| vec![c]),
containers: vec![config.ipfs.container(ipfs_info, net_config)],
volumes: Some(volumes),
..Default::default()
Expand Down
61 changes: 33 additions & 28 deletions operator/src/network/ceramic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,35 +664,40 @@ pub fn stateful_set_spec(ns: &str, bundle: &CeramicBundle<'_>) -> StatefulSetSpe
.ipfs
.container(&bundle.info, bundle.net_config),
],
init_containers: Some(vec![Container {
command: Some(vec![
"/bin/bash".to_owned(),
"-c".to_owned(),
"/ceramic-init/ceramic-init.sh".to_owned(),
]),
env: Some(init_env),
image: Some(bundle.config.init_image_name.to_owned()),
image_pull_policy: Some(bundle.config.image_pull_policy.to_owned()),
name: "init-ceramic-config".to_owned(),
resources: Some(ResourceRequirements {
limits: Some(bundle.config.resource_limits.clone().into()),
requests: Some(bundle.config.resource_limits.clone().into()),
..Default::default()
}),
volume_mounts: Some(vec![
VolumeMount {
mount_path: "/config".to_owned(),
name: "config-volume".to_owned(),
..Default::default()
},
VolumeMount {
mount_path: "/ceramic-init".to_owned(),
name: "ceramic-init".to_owned(),
init_containers: Some(
vec![Container {
command: Some(vec![
"/bin/bash".to_owned(),
"-c".to_owned(),
"/ceramic-init/ceramic-init.sh".to_owned(),
]),
env: Some(init_env),
image: Some(bundle.config.init_image_name.to_owned()),
image_pull_policy: Some(bundle.config.image_pull_policy.to_owned()),
name: "init-ceramic-config".to_owned(),
resources: Some(ResourceRequirements {
limits: Some(bundle.config.resource_limits.clone().into()),
requests: Some(bundle.config.resource_limits.clone().into()),
..Default::default()
},
]),
..Default::default()
}]),
}),
volume_mounts: Some(vec![
VolumeMount {
mount_path: "/config".to_owned(),
name: "config-volume".to_owned(),
..Default::default()
},
VolumeMount {
mount_path: "/ceramic-init".to_owned(),
name: "ceramic-init".to_owned(),
..Default::default()
},
]),
..Default::default()
}]
.into_iter()
.chain(bundle.config.ipfs.init_container(bundle.net_config))
.collect(),
),
volumes: Some(volumes),
security_context: Some(PodSecurityContext {
fs_group: Some(70),
Expand Down
145 changes: 143 additions & 2 deletions operator/src/network/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,8 @@ async fn update_peer_status(
for ceramic in ceramics {
for i in 0..ceramic.info.replicas {
let pod_name = ceramic.info.pod_name(i);
let pod = pods.get_status(&pod_name).await?;
if !is_pod_ready(&pod) {
let pod = pods.get_status(&pod_name).await;
if pod.map(|pod| !is_pod_ready(&pod)).unwrap_or(true) {
debug!(pod_name, "peer is not ready skipping");
continue;
}
Expand Down Expand Up @@ -4700,4 +4700,145 @@ mod tests {
.expect("reconciler");
timeout_after_1s(mocksrv).await;
}
#[tokio::test]
#[traced_test]
async fn migration_cmd() {
// Setup network spec and status
let network = Network::test().with_spec(NetworkSpec {
ceramic: Some(vec![CeramicSpec {
ipfs: Some(IpfsSpec::Rust(RustIpfsSpec {
migration_cmd: Some(
vec![
"from-ipfs",
"-i",
"/data/ipfs/blocks",
"-o",
"/data/ipfs/",
"--network",
"dev-unstable",
]
.into_iter()
.map(ToOwned::to_owned)
.collect(),
),
..Default::default()
})),
..Default::default()
}]),
..Default::default()
});
let mock_rpc_client = default_ipfs_rpc_mock();
let mut stub = Stub::default().with_network(network.clone());
stub.ceramics[0].stateful_set.patch(expect![[r#"
--- original
+++ modified
@@ -397,6 +397,95 @@
"name": "ceramic-init"
}
]
+ },
+ {
+ "command": [
+ "/usr/bin/ceramic-one",
+ "migrations",
+ "from-ipfs",
+ "-i",
+ "/data/ipfs/blocks",
+ "-o",
+ "/data/ipfs/",
+ "--network",
+ "dev-unstable"
+ ],
+ "env": [
+ {
+ "name": "CERAMIC_ONE_BIND_ADDRESS",
+ "value": "0.0.0.0:5001"
+ },
+ {
+ "name": "CERAMIC_ONE_KADEMLIA_PARALLELISM",
+ "value": "1"
+ },
+ {
+ "name": "CERAMIC_ONE_KADEMLIA_REPLICATION",
+ "value": "6"
+ },
+ {
+ "name": "CERAMIC_ONE_LOCAL_NETWORK_ID",
+ "value": "0"
+ },
+ {
+ "name": "CERAMIC_ONE_METRICS_BIND_ADDRESS",
+ "value": "0.0.0.0:9465"
+ },
+ {
+ "name": "CERAMIC_ONE_NETWORK",
+ "value": "local"
+ },
+ {
+ "name": "CERAMIC_ONE_STORE_DIR",
+ "value": "/data/ipfs"
+ },
+ {
+ "name": "CERAMIC_ONE_SWARM_ADDRESSES",
+ "value": "/ip4/0.0.0.0/tcp/4001"
+ },
+ {
+ "name": "RUST_LOG",
+ "value": "info,ceramic_one=debug,multipart=error"
+ }
+ ],
+ "image": "public.ecr.aws/r5b3e0r5/3box/ceramic-one:latest",
+ "imagePullPolicy": "Always",
+ "name": "ipfs-migration",
+ "ports": [
+ {
+ "containerPort": 4001,
+ "name": "swarm-tcp",
+ "protocol": "TCP"
+ },
+ {
+ "containerPort": 5001,
+ "name": "rpc",
+ "protocol": "TCP"
+ },
+ {
+ "containerPort": 9465,
+ "name": "metrics",
+ "protocol": "TCP"
+ }
+ ],
+ "resources": {
+ "limits": {
+ "cpu": "1",
+ "ephemeral-storage": "1Gi",
+ "memory": "1Gi"
+ },
+ "requests": {
+ "cpu": "1",
+ "ephemeral-storage": "1Gi",
+ "memory": "1Gi"
+ }
+ },
+ "volumeMounts": [
+ {
+ "mountPath": "/data/ipfs",
+ "name": "ipfs-data"
+ }
+ ]
}
],
"securityContext": {
"#]]);
stub.cas_ipfs_stateful_set.patch(expect![[r#"
--- original
+++ modified
"#]]);
let (testctx, api_handle) = Context::test(mock_rpc_client);
let fakeserver = ApiServerVerifier::new(api_handle);
let mocksrv = stub.run(fakeserver);
reconcile(Arc::new(network), testctx)
.await
.expect("reconciler");
timeout_after_1s(mocksrv).await;
}
}
30 changes: 27 additions & 3 deletions operator/src/network/ipfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use k8s_openapi::{
};

const IPFS_CONTAINER_NAME: &str = "ipfs";
const IPFS_STORE_DIR: &str = "/data/ipfs";
pub const IPFS_DATA_PV_CLAIM: &str = "ipfs-data";
const IPFS_SERVICE_PORT: i32 = 5001;

Expand Down Expand Up @@ -69,6 +70,12 @@ impl IpfsConfig {
IpfsConfig::Go(config) => config.config_maps(&info),
}
}
pub fn init_container(&self, net_config: &NetworkConfig) -> Option<Container> {
match self {
IpfsConfig::Rust(config) => config.init_container(net_config),
_ => None,
}
}
pub fn container(&self, info: impl Into<IpfsInfo>, net_config: &NetworkConfig) -> Container {
let info = info.into();
match self {
Expand Down Expand Up @@ -98,6 +105,7 @@ pub struct RustIpfsConfig {
storage: PersistentStorageConfig,
rust_log: String,
env: Option<BTreeMap<String, String>>,
migration_cmd: Option<Vec<String>>,
}

impl RustIpfsConfig {
Expand Down Expand Up @@ -129,6 +137,7 @@ impl Default for RustIpfsConfig {
},
rust_log: "info,ceramic_one=debug,multipart=error".to_owned(),
env: None,
migration_cmd: None,
}
}
}
Expand All @@ -149,6 +158,7 @@ impl From<RustIpfsSpec> for RustIpfsConfig {
storage: PersistentStorageConfig::from_spec(value.storage, default.storage),
rust_log: value.rust_log.unwrap_or(default.rust_log),
env: value.env,
migration_cmd: value.migration_cmd,
}
}
}
Expand Down Expand Up @@ -177,7 +187,7 @@ impl RustIpfsConfig {
},
EnvVar {
name: "CERAMIC_ONE_STORE_DIR".to_owned(),
value: Some("/data/ipfs".to_owned()),
value: Some(IPFS_STORE_DIR.to_owned()),
..Default::default()
},
EnvVar {
Expand Down Expand Up @@ -253,14 +263,28 @@ impl RustIpfsConfig {
..Default::default()
}),
volume_mounts: Some(vec![VolumeMount {
mount_path: "/data/ipfs".to_owned(),
mount_path: IPFS_STORE_DIR.to_owned(),
name: IPFS_DATA_PV_CLAIM.to_owned(),
..Default::default()
}]),
security_context: net_config.debug_mode.then(debug_mode_security_context),
..Default::default()
}
}

fn init_container(&self, net_config: &NetworkConfig) -> Option<Container> {
self.migration_cmd.as_ref().map(|cmd| Container {
name: "ipfs-migration".to_string(),
command: Some(
vec!["/usr/bin/ceramic-one", "migrations"]
.into_iter()
.chain(cmd.iter().map(String::as_str))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we moved the chain below the map on the next line, could we take out the as_str mapping?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we do not own the strings in cmd so the as_str converts from &String to &str. We could instead convert directly to String but that still requires a map on the cmd iterator.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So its about the same either way, I don't have a strong reason for one over the other.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah understood, thanks.

.map(ToOwned::to_owned)
.collect(),
),
..self.container(net_config)
})
}
}

pub struct GoIpfsConfig {
Expand Down Expand Up @@ -364,7 +388,7 @@ ipfs config --json Swarm.ResourceMgr.MaxFileDescriptors 500000
fn container(&self, info: &IpfsInfo) -> Container {
let mut volume_mounts = vec![
VolumeMount {
mount_path: "/data/ipfs".to_owned(),
mount_path: IPFS_STORE_DIR.to_owned(),
name: IPFS_DATA_PV_CLAIM.to_owned(),
..Default::default()
},
Expand Down
2 changes: 2 additions & 0 deletions operator/src/network/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ pub struct RustIpfsSpec {
/// Extra env values to pass to the image.
/// CAUTION: Any env vars specified in this set will override any predefined values.
pub env: Option<BTreeMap<String, String>>,
/// Migration command that should run before a node comes up
pub migration_cmd: Option<Vec<String>>,
}

/// Describes how the Go IPFS node for a peer should behave.
Expand Down
2 changes: 1 addition & 1 deletion runner/src/scenario/ceramic/model_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ impl CeramicModelInstanceTestUser {
config,
user_info: GooseUserInfo {
lead_user,
global_leader,
lead_worker: is_goose_lead_worker(),
global_leader,
},
small_model_id,
small_model_instance_ids,
Expand Down
Loading