Skip to content

Commit 42c3cf6

Browse files
authored
fix(k8s-driver): use dedicated kube client without read_timeout for watches (#907)
The 30s read_timeout on the shared kube client was killing the long-lived watch streams during idle periods, causing a reconnect cycle every 30 seconds. Use a separate client with no read_timeout for watch_sandboxes so the streams stay open indefinitely.
1 parent bd11395 commit 42c3cf6

File tree

1 file changed

+24
-4
lines changed
  • crates/openshell-driver-kubernetes/src

1 file changed

+24
-4
lines changed

crates/openshell-driver-kubernetes/src/driver.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ const WORKSPACE_SENTINEL: &str = ".workspace-initialized";
102102
#[derive(Clone)]
103103
pub struct KubernetesComputeDriver {
104104
client: Client,
105+
watch_client: Client,
105106
config: KubernetesComputeConfig,
106107
}
107108

@@ -117,17 +118,30 @@ impl std::fmt::Debug for KubernetesComputeDriver {
117118

118119
impl KubernetesComputeDriver {
119120
pub async fn new(config: KubernetesComputeConfig) -> Result<Self, KubeError> {
120-
let mut kube_config = match kube::Config::incluster() {
121+
let base_config = match kube::Config::incluster() {
121122
Ok(c) => c,
122123
Err(_) => kube::Config::infer()
123124
.await
124125
.map_err(kube::Error::InferConfig)?,
125126
};
127+
128+
let mut kube_config = base_config.clone();
126129
kube_config.connect_timeout = Some(Duration::from_secs(10));
127130
kube_config.read_timeout = Some(Duration::from_secs(30));
128131
kube_config.write_timeout = Some(Duration::from_secs(30));
129132
let client = Client::try_from(kube_config)?;
130-
Ok(Self { client, config })
133+
134+
let mut watch_kube_config = base_config;
135+
watch_kube_config.connect_timeout = Some(Duration::from_secs(10));
136+
watch_kube_config.read_timeout = None;
137+
watch_kube_config.write_timeout = Some(Duration::from_secs(30));
138+
let watch_client = Client::try_from(watch_kube_config)?;
139+
140+
Ok(Self {
141+
client,
142+
watch_client,
143+
config,
144+
})
131145
}
132146

133147
pub async fn capabilities(&self) -> Result<GetCapabilitiesResponse, String> {
@@ -155,6 +169,12 @@ impl KubernetesComputeDriver {
155169
self.config.ssh_handshake_skew_secs
156170
}
157171

172+
fn watch_api(&self) -> Api<DynamicObject> {
173+
let gvk = GroupVersionKind::gvk(SANDBOX_GROUP, SANDBOX_VERSION, SANDBOX_KIND);
174+
let resource = ApiResource::from_gvk(&gvk);
175+
Api::namespaced_with(self.watch_client.clone(), &self.config.namespace, &resource)
176+
}
177+
158178
fn api(&self) -> Api<DynamicObject> {
159179
let gvk = GroupVersionKind::gvk(SANDBOX_GROUP, SANDBOX_VERSION, SANDBOX_KIND);
160180
let resource = ApiResource::from_gvk(&gvk);
@@ -392,8 +412,8 @@ impl KubernetesComputeDriver {
392412

393413
pub async fn watch_sandboxes(&self) -> Result<WatchStream, String> {
394414
let namespace = self.config.namespace.clone();
395-
let sandbox_api = self.api();
396-
let event_api: Api<KubeEventObj> = Api::namespaced(self.client.clone(), &namespace);
415+
let sandbox_api = self.watch_api();
416+
let event_api: Api<KubeEventObj> = Api::namespaced(self.watch_client.clone(), &namespace);
397417
let mut sandbox_stream = watcher::watcher(sandbox_api, watcher::Config::default()).boxed();
398418
let mut event_stream = watcher::watcher(event_api, watcher::Config::default()).boxed();
399419
let (tx, rx) = mpsc::channel(256);

0 commit comments

Comments
 (0)