hydro_deploy/
deployment.rs

1#![expect(
2    mismatched_lifetime_syntaxes,
3    reason = "https://github.com/BrynCooke/buildstructor/issues/200"
4)]
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::sync::{Arc, Weak};
9
10use anyhow::Result;
11use futures::{FutureExt, StreamExt, TryStreamExt};
12use tokio::sync::RwLock;
13
14use super::aws::AwsNetwork;
15use super::gcp::GcpNetwork;
16use super::{
17    CustomService, GcpComputeEngineHost, Host, LocalhostHost, ResourcePool, ResourceResult,
18    Service, progress,
19};
20use crate::{AwsEc2Host, AzureHost, HostTargetType, ServiceBuilder};
21
22pub struct Deployment {
23    pub hosts: Vec<Weak<dyn Host>>,
24    pub services: Vec<Weak<RwLock<dyn Service>>>,
25    pub resource_pool: ResourcePool,
26    localhost_host: Option<Arc<LocalhostHost>>,
27    last_resource_result: Option<Arc<ResourceResult>>,
28    next_host_id: usize,
29    next_service_id: usize,
30}
31
32impl Default for Deployment {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38impl Deployment {
39    pub fn new() -> Self {
40        let mut ret = Self {
41            hosts: Vec::new(),
42            services: Vec::new(),
43            resource_pool: ResourcePool::default(),
44            localhost_host: None,
45            last_resource_result: None,
46            next_host_id: 0,
47            next_service_id: 0,
48        };
49
50        ret.localhost_host = Some(ret.add_host(LocalhostHost::new));
51        ret
52    }
53
54    #[expect(non_snake_case, reason = "constructor-esque")]
55    pub fn Localhost(&self) -> Arc<LocalhostHost> {
56        self.localhost_host.clone().unwrap()
57    }
58
59    #[expect(non_snake_case, reason = "constructor-esque")]
60    pub fn CustomService(
61        &mut self,
62        on: Arc<dyn Host>,
63        external_ports: Vec<u16>,
64    ) -> Arc<RwLock<CustomService>> {
65        self.add_service(|id, on| CustomService::new(id, on, external_ports), on)
66    }
67
68    /// Runs `deploy()`, and `start()`, waits for the trigger future, then runs `stop()`.
69    pub async fn run_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
70        // TODO(mingwei): should `trigger` interrupt `deploy()` and `start()`? If so make sure shutdown works as expected.
71        self.deploy().await?;
72        self.start().await?;
73        trigger.await;
74        self.stop().await?;
75        Ok(())
76    }
77
78    /// Runs `start()`, waits for the trigger future, then runs `stop()`.
79    /// This is useful if you need to initiate external network connections between
80    /// `deploy()` and `start()`.
81    pub async fn start_until(&mut self, trigger: impl Future<Output = ()>) -> Result<()> {
82        // TODO(mingwei): should `trigger` interrupt `deploy()` and `start()`? If so make sure shutdown works as expected.
83        self.start().await?;
84        trigger.await;
85        self.stop().await?;
86        Ok(())
87    }
88
89    /// Runs `deploy()`, and `start()`, waits for CTRL+C, then runs `stop()`.
90    pub async fn run_ctrl_c(&mut self) -> Result<()> {
91        self.run_until(tokio::signal::ctrl_c().map(|_| ())).await
92    }
93
94    /// Runs `start()`, waits for CTRL+C, then runs `stop()`.
95    /// This is useful if you need to initiate external network connections between
96    /// `deploy()` and `start()`.
97    pub async fn start_ctrl_c(&mut self) -> Result<()> {
98        self.start_until(tokio::signal::ctrl_c().map(|_| ())).await
99    }
100
101    pub async fn deploy(&mut self) -> Result<()> {
102        self.services.retain(|weak| weak.strong_count() > 0);
103
104        progress::ProgressTracker::with_group("deploy", Some(3), || async {
105            let mut resource_batch = super::ResourceBatch::new();
106
107            for service in self.services.iter().filter_map(Weak::upgrade) {
108                service.read().await.collect_resources(&mut resource_batch);
109            }
110
111            for host in self.hosts.iter().filter_map(Weak::upgrade) {
112                host.collect_resources(&mut resource_batch);
113            }
114
115            let resource_result = Arc::new(
116                progress::ProgressTracker::with_group("provision", Some(1), || async {
117                    resource_batch
118                        .provision(&mut self.resource_pool, self.last_resource_result.clone())
119                        .await
120                })
121                .await?,
122            );
123            self.last_resource_result = Some(resource_result.clone());
124
125            for host in self.hosts.iter().filter_map(Weak::upgrade) {
126                host.provision(&resource_result);
127            }
128
129            let upgraded_services = self
130                .services
131                .iter()
132                .filter_map(Weak::upgrade)
133                .collect::<Vec<_>>();
134
135            progress::ProgressTracker::with_group("prepare", Some(upgraded_services.len()), || {
136                let services_future = upgraded_services
137                    .iter()
138                    .map(|service: &Arc<RwLock<dyn Service>>| {
139                        let resource_result = &resource_result;
140                        async move { service.write().await.deploy(resource_result).await }
141                    })
142                    .collect::<Vec<_>>();
143
144                futures::stream::iter(services_future)
145                    .buffer_unordered(16)
146                    .try_fold((), |_, _| async { Ok(()) })
147            })
148            .await?;
149
150            progress::ProgressTracker::with_group("ready", Some(upgraded_services.len()), || {
151                let all_services_ready =
152                    upgraded_services
153                        .iter()
154                        .map(|service: &Arc<RwLock<dyn Service>>| async move {
155                            service.write().await.ready().await?;
156                            Ok(()) as Result<()>
157                        });
158
159                futures::future::try_join_all(all_services_ready)
160            })
161            .await?;
162
163            Ok(())
164        })
165        .await
166    }
167
168    pub async fn start(&mut self) -> Result<()> {
169        self.services.retain(|weak| weak.strong_count() > 0);
170
171        progress::ProgressTracker::with_group("start", None, || {
172            let all_services_start = self.services.iter().filter_map(Weak::upgrade).map(
173                |service: Arc<RwLock<dyn Service>>| async move {
174                    service.write().await.start().await?;
175                    Ok(()) as Result<()>
176                },
177            );
178
179            futures::future::try_join_all(all_services_start)
180        })
181        .await?;
182        Ok(())
183    }
184
185    pub async fn stop(&mut self) -> Result<()> {
186        self.services.retain(|weak| weak.strong_count() > 0);
187
188        progress::ProgressTracker::with_group("stop", None, || {
189            let all_services_stop = self.services.iter().filter_map(Weak::upgrade).map(
190                |service: Arc<RwLock<dyn Service>>| async move {
191                    service.write().await.stop().await?;
192                    Ok(()) as Result<()>
193                },
194            );
195
196            futures::future::try_join_all(all_services_stop)
197        })
198        .await?;
199        Ok(())
200    }
201}
202
203impl Deployment {
204    pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(&mut self, host: F) -> Arc<T> {
205        let arc = Arc::new(host(self.next_host_id));
206        self.next_host_id += 1;
207
208        self.hosts.push(Arc::downgrade(&arc) as Weak<dyn Host>);
209        arc
210    }
211
212    pub fn add_service<T: Service + 'static>(
213        &mut self,
214        service: impl ServiceBuilder<Service = T>,
215        on: Arc<dyn Host>,
216    ) -> Arc<RwLock<T>> {
217        let arc = Arc::new(RwLock::new(service.build(self.next_service_id, on)));
218        self.next_service_id += 1;
219
220        self.services
221            .push(Arc::downgrade(&arc) as Weak<RwLock<dyn Service>>);
222        arc
223    }
224}
225
226/// Buildstructor methods.
227#[buildstructor::buildstructor]
228impl Deployment {
229    #[builder(entry = "GcpComputeEngineHost", exit = "add")]
230    pub fn add_gcp_compute_engine_host(
231        &mut self,
232        project: String,
233        machine_type: String,
234        image: String,
235        target_type: Option<HostTargetType>,
236        region: String,
237        network: Arc<RwLock<GcpNetwork>>,
238        user: Option<String>,
239        display_name: Option<String>,
240    ) -> Arc<GcpComputeEngineHost> {
241        self.add_host(|id| {
242            GcpComputeEngineHost::new(
243                id,
244                project,
245                machine_type,
246                image,
247                target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
248                region,
249                network,
250                user,
251                display_name,
252            )
253        })
254    }
255
256    #[builder(entry = "AzureHost", exit = "add")]
257    pub fn add_azure_host(
258        &mut self,
259        project: String,
260        os_type: String, // linux or windows
261        machine_size: String,
262        image: Option<HashMap<String, String>>,
263        target_type: Option<HostTargetType>,
264        region: String,
265        user: Option<String>,
266    ) -> Arc<AzureHost> {
267        self.add_host(|id| {
268            AzureHost::new(
269                id,
270                project,
271                os_type,
272                machine_size,
273                image,
274                target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
275                region,
276                user,
277            )
278        })
279    }
280
281    #[builder(entry = "AwsEc2Host", exit = "add")]
282    pub fn add_aws_ec2_host(
283        &mut self,
284        region: String,
285        instance_type: String,
286        target_type: Option<HostTargetType>,
287        ami: String,
288        network: Arc<RwLock<AwsNetwork>>,
289        user: Option<String>,
290        display_name: Option<String>,
291    ) -> Arc<AwsEc2Host> {
292        self.add_host(|id| {
293            AwsEc2Host::new(
294                id,
295                region,
296                instance_type,
297                target_type.unwrap_or(HostTargetType::Linux(crate::LinuxCompileType::Musl)),
298                ami,
299                network,
300                user,
301                display_name,
302            )
303        })
304    }
305}