Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 921757c

Browse files
chore: refactoring code for user workflow observability
1 parent 63f26c0 commit 921757c

File tree

11 files changed

+209
-77
lines changed

11 files changed

+209
-77
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,4 @@
11
-- Add migration script here
2+
ALTER TABLE idempotency ALTER COLUMN response_status_code DROP NOT NULL;
3+
ALTER TABLE idempotency ALTER COLUMN response_body DROP NOT NULL;
4+
ALTER TABLE idempotency ALTER COLUMN response_headers DROP NOT NULL;
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-- Add migration script here
2+
CREATE TABLE newsletter_issues (
3+
newsletter_issue_id uuid NOT NULL,
4+
title TEXT NOT NULL,
5+
text_content TEXT NOT NULL,
6+
html_content TEXT NOT NULL,
7+
published_at TEXT NOT NULL,
8+
PRIMARY KEY(newsletter_issue_id)
9+
);
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
-- Add migration script here
2+
CREATE TABLE issue_delivery_queue (
3+
newsletter_issue_id uuid NOT NULL REFERENCES newsletter_issues (newsletter_issue_id),
4+
subscriber_email TEXT NOT NULL,
5+
PRIMARY KEY(newsletter_issue_id, subscriber_email)
6+
);

‎src/configuration.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,7 @@ impl EmailClientSettings {
3636
pub fn client(self) -> EmailClient {
3737
let sender = self.sender().expect("Invalid sender email address");
3838
let timeout = std::time::Duration::from_millis(self.timeout_ms);
39-
EmailClient::new(
40-
self.base_url,
41-
sender,
42-
self.authorization_token,
43-
timeout,
44-
)
39+
EmailClient::new(self.base_url, sender, self.authorization_token, timeout)
4540
}
4641
pub fn sender(&self) -> Result<SubscriberEmail, String> {
4742
SubscriberEmail::parse(self.sender_email.clone())

‎src/idempotency/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ mod persistence;
33
pub use key::IdempotencyKey;
44
pub use persistence::get_saved_response;
55
pub use persistence::save_response;
6+
pub use persistence::{try_processing, NextAction};

‎src/idempotency/persistence.rs

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use super::IdempotencyKey;
22
use actix_web::body::to_bytes;
3-
use actix_web::http::{self,StatusCode};
3+
use actix_web::http::StatusCode;
44
use actix_web::HttpResponse;
55
use sqlx::postgres::PgHasArrayType;
66
use sqlx::PgPool;
7+
use sqlx::{Postgres, Transaction};
78
use uuid::Uuid;
89

910
#[derive(Debug, sqlx::Type)]
@@ -27,9 +28,9 @@ pub async fn get_saved_response(
2728
let saved_response = sqlx::query!(
2829
r#"
2930
SELECT
30-
response_status_code,
31-
response_headers as "response_headers: Vec<HeaderPairRecord>",
32-
response_body
31+
response_status_code as "response_status_code!",
32+
response_headers as "response_headers!: Vec<HeaderPairRecord>",
33+
response_body as "response_body!"
3334
FROM idempotency
3435
WHERE
3536
user_id = 1ドル AND
@@ -40,6 +41,7 @@ pub async fn get_saved_response(
4041
)
4142
.fetch_optional(pool)
4243
.await?;
44+
4345
if let Some(r) = saved_response {
4446
let status_code = StatusCode::from_u16(r.response_status_code.try_into()?)?;
4547
let mut response = HttpResponse::build(status_code);
@@ -53,7 +55,7 @@ pub async fn get_saved_response(
5355
}
5456

5557
pub async fn save_response(
56-
pool:&PgPool,
58+
muttransaction:Transaction<'static,Postgres>,
5759
idempotency_key: &IdempotencyKey,
5860
user_id: Uuid,
5961
http_response: HttpResponse,
@@ -72,25 +74,63 @@ pub async fn save_response(
7274
};
7375
sqlx::query_unchecked!(
7476
r#"
75-
INSERT INTO idempotency (
76-
user_id,
77-
idempotency_key,
78-
response_status_code,
79-
response_headers,
80-
response_body,
81-
created_at
82-
)
83-
VALUES (1,ドル 2,ドル 3,ドル 4,ドル 5,ドル now())
77+
UPDATE idempotency
78+
SET
79+
response_status_code = 3,ドル
80+
response_headers = 4,ドル
81+
response_body = 5ドル
82+
WHERE
83+
user_id = 1ドル AND
84+
idempotency_key = 2ドル
8485
"#,
8586
user_id,
8687
idempotency_key.as_ref(),
8788
status_code,
8889
headers,
8990
body.as_ref(),
9091
)
91-
.execute(pool)
92+
.execute(&mut transaction)
9293
.await?;
94+
transaction.commit().await?;
9395

9496
let http_response = response_head.set_body(body).map_into_boxed_body();
9597
Ok(http_response)
9698
}
99+
100+
pub enum NextAction {
101+
StartProcessing(Transaction<'static, Postgres>),
102+
ReturnSavedResponse(HttpResponse),
103+
}
104+
105+
pub async fn try_processing(
106+
pool: &PgPool,
107+
idempotency_key: &IdempotencyKey,
108+
user_id: Uuid,
109+
) -> Result<NextAction, anyhow::Error> {
110+
let mut transaction = pool.begin().await?;
111+
let n_inserted_rows = sqlx::query!(
112+
r#"
113+
INSERT INTO idempotency (
114+
user_id,
115+
idempotency_key,
116+
created_at
117+
)
118+
VALUES (1,ドル 2,ドル now())
119+
ON CONFLICT DO NOTHING
120+
"#,
121+
user_id,
122+
idempotency_key.as_ref()
123+
)
124+
.execute(&mut transaction)
125+
.await?
126+
.rows_affected();
127+
128+
if n_inserted_rows > 0 {
129+
Ok(NextAction::StartProcessing(transaction))
130+
} else {
131+
let saved_response = get_saved_response(pool, idempotency_key, user_id)
132+
.await?
133+
.ok_or_else(|| anyhow::anyhow!("We expected a saved response, we didn't find it"))?;
134+
Ok(NextAction::ReturnSavedResponse(saved_response))
135+
}
136+
}

‎src/routes/admin/newsletters/get.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ use actix_web::HttpResponse;
33
use actix_web_flash_messages::IncomingFlashMessages;
44
use std::fmt::Write;
55

6-
use crate::idempotency;
7-
86
pub async fn publish_newsletter_form(
97
flash_messages: IncomingFlashMessages,
108
) -> Result<HttpResponse, actix_web::Error> {

‎src/routes/admin/newsletters/post.rs

Lines changed: 74 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
use crate::authentication::UserId;
2-
use crate::domain::SubscriberEmail;
3-
use crate::email_client::EmailClient;
4-
use crate::idempotency::{get_saved_response, save_response};
52
use crate::idempotency::IdempotencyKey;
3+
use crate::idempotency::{get_saved_response, save_response, try_processing, NextAction};
64
use crate::utils::{e400, e500, see_other};
75
use actix_web::web::ReqData;
86
use actix_web::{web, HttpResponse};
97
use actix_web_flash_messages::FlashMessage;
108
use anyhow::Context;
11-
use sqlx::PgPool;
9+
use sqlx::{PgPool, Postgres, Transaction};
10+
use uuid::Uuid;
1211

1312
#[derive(serde::Deserialize)]
1413
pub struct FormData {
@@ -20,13 +19,12 @@ pub struct FormData {
2019

2120
#[tracing::instrument(
2221
name = "Publish a newsletter issue",
23-
skip(form, pool, email_client, user_id),
22+
skip_all,
2423
fields(user_id = %*user_id)
2524
)]
2625
pub async fn publish_newsletter(
2726
form: web::Form<FormData>,
2827
pool: web::Data<PgPool>,
29-
email_client: web::Data<EmailClient>,
3028
user_id: ReqData<UserId>,
3129
) -> Result<HttpResponse, actix_web::Error> {
3230
let user_id = user_id.into_inner();
@@ -37,65 +35,92 @@ pub async fn publish_newsletter(
3735
idempotency_key,
3836
} = form.0;
3937
let idempotency_key: IdempotencyKey = idempotency_key.try_into().map_err(e400)?;
38+
let mut transaction = match try_processing(&pool, &idempotency_key, *user_id)
39+
.await
40+
.map_err(e500)?
41+
{
42+
NextAction::StartProcessing(t) => t,
43+
NextAction::ReturnSavedResponse(saved_response) => {
44+
success_message().send();
45+
return Ok(saved_response);
46+
}
47+
};
48+
4049
if let Some(save_response) = get_saved_response(&pool, &idempotency_key, *user_id)
4150
.await
4251
.map_err(e500)?
4352
{
4453
return Ok(save_response);
4554
}
46-
let subscribers = get_confirmed_subscribers(&pool).await.map_err(e500)?;
47-
for subscriber in subscribers {
48-
match subscriber {
49-
Ok(subscriber) => {
50-
email_client
51-
.send_email(&subscriber.email, &title, &html_content, &text_content)
52-
.await
53-
.with_context(|| {
54-
format!("Failed to send newsletter issue to {}", subscriber.email)
55-
})
56-
.map_err(e500)?;
57-
}
55+
let issue_id = insert_newsletter_issue(&mut transaction, &title, &text_content, &html_content)
56+
.await
57+
.context("Failed to store newsletter issue details")
58+
.map_err(e500)?;
59+
enqueue_delivery_tasks(&mut transaction, issue_id)
60+
.await
61+
.context("Failed to enqueue delivery task")
62+
.map_err(e500)?;
5863

59-
Err(error) => {
60-
tracing::warn!(
61-
error.cause_chain = ?error,
62-
error.message = %error,
63-
"Skipping a confirmed subscriber. \
64-
Their stored contact details are invalid",
65-
);
66-
}
67-
}
68-
}
69-
FlashMessage::info("The newsletter issue has been published!").send();
7064
let response = see_other("/admin/newsletters");
71-
let response = save_response(&pool, &idempotency_key, *user_id, response)
65+
let response = save_response(transaction, &idempotency_key, *user_id, response)
7266
.await
7367
.map_err(e500)?;
68+
success_message().send();
7469
Ok(response)
7570
}
7671

77-
struct ConfirmedSubscriber {
78-
email: SubscriberEmail,
72+
fn success_message() -> FlashMessage {
73+
FlashMessage::info("The newsletter issue has been accepted - \
74+
emails will go out shortly.",)
75+
}
76+
77+
#[tracing::instrument(skip_all)]
78+
async fn insert_newsletter_issue(
79+
transaction: &mut Transaction<'_, Postgres>,
80+
title: &str,
81+
text_content: &str,
82+
html_content: &str,
83+
) -> Result<Uuid, sqlx::Error> {
84+
let newsletter_issue_id = Uuid::new_v4();
85+
sqlx::query!(
86+
r#"
87+
INSERT INTO newsletter_issues (
88+
newsletter_issue_id,
89+
title,
90+
text_content,
91+
html_content,
92+
published_at
93+
)
94+
VALUES (1,ドル 2,ドル 3,ドル 4,ドル now())
95+
"#,
96+
newsletter_issue_id,
97+
title,
98+
text_content,
99+
html_content
100+
)
101+
.execute(transaction)
102+
.await?;
103+
Ok(newsletter_issue_id)
79104
}
80105

81-
#[tracing::instrument(name = "Get confirmed subscribers", skip(pool))]
82-
async fn get_confirmed_subscribers(
83-
pool: &PgPool,
84-
) -> Result<Vec<Result<ConfirmedSubscriber, anyhow::Error>>, anyhow::Error> {
85-
let confirmed_subscribers = sqlx::query!(
106+
#[tracing::instrument(skip_all)]
107+
async fn enqueue_delivery_tasks(
108+
transaction: &mut Transaction<'_, Postgres>,
109+
newsletter_issue_id: Uuid,
110+
) -> Result<(), sqlx::Error> {
111+
sqlx::query!(
86112
r#"
87-
SELECT email
88-
FROM subscriptions
89-
WHERE status = 'confirmed'
113+
INSERT INTO issue_delivery_queue (
114+
newsletter_issue_id,
115+
subscriber_email
116+
)
117+
SELECT 1,ドル email
118+
FROM subscriptions
119+
WHERE status = 'confirmed'
90120
"#,
121+
newsletter_issue_id,
91122
)
92-
.fetch_all(pool)
93-
.await?
94-
.into_iter()
95-
.map(|r| match SubscriberEmail::parse(r.email) {
96-
Ok(email) => Ok(ConfirmedSubscriber { email }),
97-
Err(error) => Err(anyhow::anyhow!(error)),
98-
})
99-
.collect();
100-
Ok(confirmed_subscribers)
123+
.execute(transaction)
124+
.await?;
125+
Ok(())
101126
}

‎src/utils.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use actix_web::http::header::LOCATION;
2-
use actix_web::http::StatusCode;
32
use actix_web::HttpResponse;
43

54
// Return a 400 with the user-representation of the validation error as body.

‎tests/api/helpers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use argon2::{Argon2, PasswordHasher};
33
use once_cell::sync::Lazy;
44
use robust_rust::{
55
configuration::{get_configuration, DatabaseSettings},
6+
email_client::EmailClient,
67
startup::{get_connection_pool, Application},
78
telemetry::{get_subscriber, init_subscriber},
8-
email_client::EmailClient,
99
};
1010
use sqlx::{Connection, Executor, PgConnection, PgPool};
1111
use uuid::Uuid;

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /