Skip to content

Commit

Permalink
feat: wait for pod ready
Browse files Browse the repository at this point in the history
Prior to this change keramik only checked the Ceramic API
Now its checks the pod ready condition.
  • Loading branch information
nathanielc committed Oct 5, 2023
1 parent 894f936 commit 8698629
Show file tree
Hide file tree
Showing 37 changed files with 383 additions and 8,109 deletions.
4 changes: 4 additions & 0 deletions operator/src/network/ceramic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ impl CeramicInfo {
pub fn new_name(&self, name: &str) -> String {
format!("{name}-{}", self.suffix)
}
/// Determine the pod name
pub fn pod_name(&self, peer: i32) -> String {
format!("{}-{peer}", self.stateful_set)
}
/// Determine the IPFS RPC address of a Ceramic peer
pub fn ipfs_rpc_addr(&self, ns: &str, peer: i32) -> String {
format!(
Expand Down
251 changes: 205 additions & 46 deletions operator/src/network/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,17 @@ async fn update_peer_status(
// Forget all previous status
status.peers.clear();

let pods: Api<Pod> = Api::namespaced(cx.k_client.clone(), ns);

// Check status of all ceramic peers first
for ceramic in ceramics {
for i in 0..ceramic.info.replicas {
let pod_name = ceramic.info.pod_name(i);
let pod = pods.get_status(&pod_name).await?;
if !is_pod_ready(&pod) {
debug!(pod_name, "peer is not ready skipping");
continue;
}
let ipfs_rpc_addr = ceramic.info.ipfs_rpc_addr(ns, i);
let info = match cx.rpc_client.peer_info(&ipfs_rpc_addr).await {
Ok(res) => res,
Expand Down Expand Up @@ -640,6 +648,19 @@ async fn update_peer_status(
Ok(min_connected_peers)
}

fn is_pod_ready(pod: &Pod) -> bool {
if let Some(status) = &pod.status {
if let Some(conditions) = &status.conditions {
for condition in conditions {
if condition.type_ == "Ready" && condition.status == "True" {
return true;
}
}
}
}
false
}

async fn is_secret_missing(
cx: Arc<Context<impl IpfsRpcClient, impl RngCore>>,
ns: &str,
Expand Down Expand Up @@ -732,7 +753,7 @@ mod tests {
use k8s_openapi::{
api::{
batch::v1::{Job, JobStatus},
core::v1::Secret,
core::v1::{Pod, PodCondition, PodStatus, Secret},
},
apimachinery::pkg::api::resource::Quantity,
ByteString,
Expand Down Expand Up @@ -794,6 +815,26 @@ mod tests {
});
}

fn not_ready_pod_status() -> Option<Pod> {
Some(Pod {
status: None,
..Default::default()
})
}
fn ready_pod_status() -> Option<Pod> {
Some(Pod {
status: Some(PodStatus {
conditions: Some(vec![PodCondition {
status: "True".to_string(),
type_: "Ready".to_string(),
..Default::default()
}]),
..Default::default()
}),
..Default::default()
})
}

// This tests defines the default stubs,
// meaning the default stubs are the request response pairs
// that occur when reconiling a default spec and status.
Expand All @@ -813,7 +854,7 @@ mod tests {
}
#[tokio::test]
#[traced_test]
async fn reconcile_two_peers() {
async fn reconcile_two_peers_simple() {
// Setup network spec and status
let network = Network::test()
.with_spec(NetworkSpec {
Expand Down Expand Up @@ -864,6 +905,14 @@ mod tests {
"matchLabels": {
"app": "ceramic"
"#]]);
stub.ceramic_pod_status.push((
expect_file!["./testdata/ceramic_pod_status-0-0"].into(),
ready_pod_status(),
));
stub.ceramic_pod_status.push((
expect_file!["./testdata/ceramic_pod_status-0-1"].into(),
ready_pod_status(),
));
stub.keramik_peers_configmap.patch(expect![[r#"
--- original
+++ modified
Expand Down Expand Up @@ -938,6 +987,101 @@ mod tests {
expect_file!["./testdata/bootstrap_job_two_peers_apply"],
Some(Job::default()),
));

let (testctx, api_handle) = Context::test(mock_rpc_client);
let fakeserver = ApiServerVerifier::new(api_handle);
let mocksrv = stub.run(fakeserver);
reconcile(Arc::new(network), testctx)
.await
.expect("reconciler");
timeout_after_1s(mocksrv).await;
}

#[tokio::test]
#[traced_test]
async fn reconcile_two_peers_not_ready() {
// Setup network spec and status
let network = Network::test()
.with_spec(NetworkSpec {
replicas: 2,
..Default::default()
})
.with_status(NetworkStatus {
replicas: 2,
ready_replicas: 0,
namespace: Some("keramik-test".to_owned()),
peers: vec![],
});
// Setup peer info
let mut mock_rpc_client = MockIpfsRpcClientTest::new();
// We expect only cas will be checked since both pods report they are not ready
mock_cas_peer_info_ready(&mut mock_rpc_client);
mock_connected_peer_status(&mut mock_rpc_client);

let mut stub = Stub::default().with_network(network.clone());
// Patch expected request values
stub.ceramics[0].stateful_set.patch(expect![[r#"
--- original
+++ modified
@@ -17,7 +17,7 @@
},
"spec": {
"podManagementPolicy": "Parallel",
- "replicas": 0,
+ "replicas": 2,
"selector": {
"matchLabels": {
"app": "ceramic"
"#]]);
stub.ceramic_pod_status.push((
expect_file!["./testdata/ceramic_pod_status-0-0"].into(),
not_ready_pod_status(),
));
stub.ceramic_pod_status.push((
expect_file!["./testdata/ceramic_pod_status-0-1"].into(),
not_ready_pod_status(),
));
stub.keramik_peers_configmap.patch(expect![[r#"
--- original
+++ modified
@@ -9,7 +9,7 @@
"apiVersion": "v1",
"kind": "ConfigMap",
"data": {
- "peers.json": "[]"
+ "peers.json": "[{\"ipfs\":{\"peerId\":\"cas_peer_id\",\"ipfsRpcAddr\":\"http://cas-ipfs:5001\",\"p2pAddrs\":[\"/ip4/10.0.0.3/tcp/4001/p2p/cas_peer_id\"]}}]"
},
"metadata": {
"labels": {
"#]]);
stub.status.patch(expect![[r#"
--- original
+++ modified
@@ -7,10 +7,20 @@
},
body: {
"status": {
- "replicas": 0,
+ "replicas": 2,
"readyReplicas": 0,
- "namespace": null,
- "peers": []
+ "namespace": "keramik-test",
+ "peers": [
+ {
+ "ipfs": {
+ "peerId": "cas_peer_id",
+ "ipfsRpcAddr": "http://cas-ipfs:5001",
+ "p2pAddrs": [
+ "/ip4/10.0.0.3/tcp/4001/p2p/cas_peer_id"
+ ]
+ }
+ }
+ ]
}
},
}
"#]]);
let (testctx, api_handle) = Context::test(mock_rpc_client);
let fakeserver = ApiServerVerifier::new(api_handle);
let mocksrv = stub.run(fakeserver);
Expand Down Expand Up @@ -998,6 +1142,14 @@ mod tests {
"matchLabels": {
"app": "ceramic"
"#]]);
stub.ceramic_pod_status.push((
expect_file!["./testdata/ceramic_pod_status-0-0"].into(),
ready_pod_status(),
));
stub.ceramic_pod_status.push((
expect_file!["./testdata/ceramic_pod_status-0-1"].into(),
ready_pod_status(),
));
stub.keramik_peers_configmap.patch(expect![[r#"
--- original
+++ modified
Expand Down Expand Up @@ -1135,6 +1287,14 @@ mod tests {
"matchLabels": {
"app": "ceramic"
"#]]);
stub.ceramic_pod_status.push((
expect_file!["./testdata/ceramic_pod_status-0-0"].into(),
ready_pod_status(),
));
stub.ceramic_pod_status.push((
expect_file!["./testdata/ceramic_pod_status-0-1"].into(),
ready_pod_status(),
));
stub.keramik_peers_configmap.patch(expect![[r#"
--- original
+++ modified
Expand Down Expand Up @@ -1264,6 +1424,14 @@ mod tests {
"matchLabels": {
"app": "ceramic"
"#]]);
stub.ceramic_pod_status.push((
expect_file!["./testdata/ceramic_pod_status-0-0"].into(),
ready_pod_status(),
));
stub.ceramic_pod_status.push((
expect_file!["./testdata/ceramic_pod_status-0-1"].into(),
ready_pod_status(),
));
stub.keramik_peers_configmap.patch(expect![[r#"
--- original
+++ modified
Expand Down Expand Up @@ -1386,6 +1554,14 @@ mod tests {
"matchLabels": {
"app": "ceramic"
"#]]);
stub.ceramic_pod_status.push((
expect_file!["./testdata/ceramic_pod_status-0-0"].into(),
ready_pod_status(),
));
stub.ceramic_pod_status.push((
expect_file!["./testdata/ceramic_pod_status-0-1"].into(),
ready_pod_status(),
));
stub.keramik_peers_configmap.patch(expect![[r#"
--- original
+++ modified
Expand Down Expand Up @@ -2736,59 +2912,31 @@ mod tests {
#[tokio::test]
async fn multiple_weighted_ceramics() {
// Setup network spec and status
let weights = [10, 2, 1, 1, 1, 1, 1, 1, 1, 1];
let replicas = 20;
// To keep it simple we want weights to be the replica counts
// Assert this is true.
assert_eq!(weights.iter().sum::<i32>(), replicas);
let network = Network::test().with_spec(NetworkSpec {
replicas: 1024,
ceramic: vec![
CeramicSpec {
weight: Some(512),
..Default::default()
},
CeramicSpec {
weight: Some(256),
..Default::default()
},
CeramicSpec {
weight: Some(128),
..Default::default()
},
CeramicSpec {
weight: Some(64),
..Default::default()
},
CeramicSpec {
weight: Some(32),
..Default::default()
},
CeramicSpec {
weight: Some(16),
replicas,
ceramic: weights
.iter()
.map(|w| CeramicSpec {
weight: Some(*w),
..Default::default()
},
CeramicSpec {
weight: Some(8),
..Default::default()
},
CeramicSpec {
weight: Some(4),
..Default::default()
},
CeramicSpec {
weight: Some(2),
..Default::default()
},
CeramicSpec {
weight: Some(1),
..Default::default()
},
],
})
.collect(),

..Default::default()
});
let mock_rpc_client = ipfs_rpc_mock_n(1025);
// + 1 for cas
let mock_rpc_client = ipfs_rpc_mock_n(replicas as usize + 1);
let mut stub = Stub::default().with_network(network.clone());
// Remove first deletes
stub.ceramic_deletes = Vec::new();
// Expect new ceramics
stub.ceramics = Vec::new();
for i in 0..10 {
for i in 0..weights.len() {
stub.ceramics.push(CeramicStub {
configmaps: vec![
expect_file!["./testdata/default_stubs/ceramic_init_configmap"].into(),
Expand All @@ -2797,6 +2945,17 @@ mod tests {
service: expect_file![format!("./testdata/ceramic_svc_weighted_{i}")].into(),
});
}
for (i, w) in weights.iter().enumerate() {
for j in 0..*w {
stub.ceramic_pod_status.push((
expect_file![format!(
"./testdata/multiple_weighted_ceramics/ceramic_pod_status-{i}-{j}"
)]
.into(),
ready_pod_status(),
));
}
}
stub.keramik_peers_configmap = expect_file!["./testdata/ceramic_weighted_peers"].into();
// Bootstrap is applied if we have at least two peers.
// However we do not expect to see any GET/DELETE for the bootstrap job as all peers report
Expand Down
Loading

0 comments on commit 8698629

Please sign in to comment.