Use the count cache (#312)
* Use the count cache By default read from the count cache - if there is no value there, then do a full COUNT. The cache will be filled when the user posts up some more history * clean up server db error handling Co-authored-by: Conrad Ludgate <conrad.ludgate@truelayer.com>
This commit is contained in:
parent
6e11b8e0ed
commit
ed4e07d2e6
3 changed files with 63 additions and 54 deletions
|
@ -1,8 +1,7 @@
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use eyre::{eyre, Result};
|
use sqlx::{postgres::PgPoolOptions, Result};
|
||||||
use sqlx::postgres::PgPoolOptions;
|
|
||||||
|
|
||||||
use crate::settings::HISTORY_PAGE_SIZE;
|
use crate::settings::HISTORY_PAGE_SIZE;
|
||||||
|
|
||||||
|
@ -25,6 +24,7 @@ pub trait Database {
|
||||||
async fn add_user(&self, user: &NewUser) -> Result<i64>;
|
async fn add_user(&self, user: &NewUser) -> Result<i64>;
|
||||||
|
|
||||||
async fn count_history(&self, user: &User) -> Result<i64>;
|
async fn count_history(&self, user: &User) -> Result<i64>;
|
||||||
|
async fn count_history_cached(&self, user: &User) -> Result<i64>;
|
||||||
|
|
||||||
async fn count_history_range(
|
async fn count_history_range(
|
||||||
&self,
|
&self,
|
||||||
|
@ -63,7 +63,7 @@ pub struct Postgres {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Postgres {
|
impl Postgres {
|
||||||
pub async fn new(uri: &str) -> Result<Self, sqlx::Error> {
|
pub async fn new(uri: &str) -> Result<Self> {
|
||||||
let pool = PgPoolOptions::new()
|
let pool = PgPoolOptions::new()
|
||||||
.max_connections(100)
|
.max_connections(100)
|
||||||
.connect(uri)
|
.connect(uri)
|
||||||
|
@ -78,52 +78,36 @@ impl Postgres {
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Database for Postgres {
|
impl Database for Postgres {
|
||||||
async fn get_session(&self, token: &str) -> Result<Session> {
|
async fn get_session(&self, token: &str) -> Result<Session> {
|
||||||
let res: Option<Session> =
|
sqlx::query_as::<_, Session>("select * from sessions where token = $1")
|
||||||
sqlx::query_as::<_, Session>("select * from sessions where token = $1")
|
.bind(token)
|
||||||
.bind(token)
|
.fetch_one(&self.pool)
|
||||||
.fetch_optional(&self.pool)
|
.await
|
||||||
.await?;
|
|
||||||
|
|
||||||
if let Some(s) = res {
|
|
||||||
Ok(s)
|
|
||||||
} else {
|
|
||||||
Err(eyre!("could not find session"))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_user(&self, username: &str) -> Result<User> {
|
async fn get_user(&self, username: &str) -> Result<User> {
|
||||||
let res: Option<User> =
|
sqlx::query_as::<_, User>("select * from users where username = $1")
|
||||||
sqlx::query_as::<_, User>("select * from users where username = $1")
|
.bind(username)
|
||||||
.bind(username)
|
.fetch_one(&self.pool)
|
||||||
.fetch_optional(&self.pool)
|
.await
|
||||||
.await?;
|
|
||||||
|
|
||||||
if let Some(u) = res {
|
|
||||||
Ok(u)
|
|
||||||
} else {
|
|
||||||
Err(eyre!("could not find user"))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_session_user(&self, token: &str) -> Result<User> {
|
async fn get_session_user(&self, token: &str) -> Result<User> {
|
||||||
let res: Option<User> = sqlx::query_as::<_, User>(
|
sqlx::query_as::<_, User>(
|
||||||
"select * from users
|
"select * from users
|
||||||
inner join sessions
|
inner join sessions
|
||||||
on users.id = sessions.user_id
|
on users.id = sessions.user_id
|
||||||
and sessions.token = $1",
|
and sessions.token = $1",
|
||||||
)
|
)
|
||||||
.bind(token)
|
.bind(token)
|
||||||
.fetch_optional(&self.pool)
|
.fetch_one(&self.pool)
|
||||||
.await?;
|
.await
|
||||||
|
|
||||||
if let Some(u) = res {
|
|
||||||
Ok(u)
|
|
||||||
} else {
|
|
||||||
Err(eyre!("could not find user"))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn count_history(&self, user: &User) -> Result<i64> {
|
async fn count_history(&self, user: &User) -> Result<i64> {
|
||||||
|
// The cache is new, and the user might not yet have a cache value.
|
||||||
|
// They will have one as soon as they post up some new history, but handle that
|
||||||
|
// edge case.
|
||||||
|
|
||||||
let res: (i64,) = sqlx::query_as(
|
let res: (i64,) = sqlx::query_as(
|
||||||
"select count(1) from history
|
"select count(1) from history
|
||||||
where user_id = $1",
|
where user_id = $1",
|
||||||
|
@ -135,6 +119,18 @@ impl Database for Postgres {
|
||||||
Ok(res.0)
|
Ok(res.0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn count_history_cached(&self, user: &User) -> Result<i64> {
|
||||||
|
let res: (i64,) = sqlx::query_as(
|
||||||
|
"select total from total_history_count_user
|
||||||
|
where user_id = $1",
|
||||||
|
)
|
||||||
|
.bind(user.id)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(res.0)
|
||||||
|
}
|
||||||
|
|
||||||
async fn count_history_range(
|
async fn count_history_range(
|
||||||
&self,
|
&self,
|
||||||
user: &User,
|
user: &User,
|
||||||
|
@ -300,17 +296,10 @@ impl Database for Postgres {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_user_session(&self, u: &User) -> Result<Session> {
|
async fn get_user_session(&self, u: &User) -> Result<Session> {
|
||||||
let res: Option<Session> =
|
sqlx::query_as::<_, Session>("select * from sessions where user_id = $1")
|
||||||
sqlx::query_as::<_, Session>("select * from sessions where user_id = $1")
|
.bind(u.id)
|
||||||
.bind(u.id)
|
.fetch_one(&self.pool)
|
||||||
.fetch_optional(&self.pool)
|
.await
|
||||||
.await?;
|
|
||||||
|
|
||||||
if let Some(s) = res {
|
|
||||||
Ok(s)
|
|
||||||
} else {
|
|
||||||
Err(eyre!("could not find session"))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn oldest_history(&self, user: &User) -> Result<History> {
|
async fn oldest_history(&self, user: &User) -> Result<History> {
|
||||||
|
|
|
@ -13,10 +13,17 @@ pub async fn count(
|
||||||
user: User,
|
user: User,
|
||||||
db: Extension<Postgres>,
|
db: Extension<Postgres>,
|
||||||
) -> Result<Json<CountResponse>, ErrorResponseStatus<'static>> {
|
) -> Result<Json<CountResponse>, ErrorResponseStatus<'static>> {
|
||||||
match db.count_history(&user).await {
|
match db.count_history_cached(&user).await {
|
||||||
|
// By default read out the cached value
|
||||||
Ok(count) => Ok(Json(CountResponse { count })),
|
Ok(count) => Ok(Json(CountResponse { count })),
|
||||||
Err(_) => Err(ErrorResponse::reply("failed to query history count")
|
|
||||||
.with_status(StatusCode::INTERNAL_SERVER_ERROR)),
|
// If that fails, fallback on a full COUNT. Cache is built on a POST
|
||||||
|
// only
|
||||||
|
Err(_) => match db.count_history(&user).await {
|
||||||
|
Ok(count) => Ok(Json(CountResponse { count })),
|
||||||
|
Err(_) => Err(ErrorResponse::reply("failed to query history count")
|
||||||
|
.with_status(StatusCode::INTERNAL_SERVER_ERROR)),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,10 +32,15 @@ pub async fn get(
|
||||||
) -> Result<Json<UserResponse>, ErrorResponseStatus<'static>> {
|
) -> Result<Json<UserResponse>, ErrorResponseStatus<'static>> {
|
||||||
let user = match db.get_user(username.as_ref()).await {
|
let user = match db.get_user(username.as_ref()).await {
|
||||||
Ok(user) => user,
|
Ok(user) => user,
|
||||||
Err(e) => {
|
Err(sqlx::Error::RowNotFound) => {
|
||||||
debug!("user not found: {}", e);
|
debug!("user not found: {}", username);
|
||||||
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
|
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
|
||||||
}
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("database error: {}", err);
|
||||||
|
return Err(ErrorResponse::reply("database error")
|
||||||
|
.with_status(StatusCode::INTERNAL_SERVER_ERROR));
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(Json(UserResponse {
|
Ok(Json(UserResponse {
|
||||||
|
@ -96,20 +101,28 @@ pub async fn login(
|
||||||
) -> Result<Json<LoginResponse>, ErrorResponseStatus<'static>> {
|
) -> Result<Json<LoginResponse>, ErrorResponseStatus<'static>> {
|
||||||
let user = match db.get_user(login.username.borrow()).await {
|
let user = match db.get_user(login.username.borrow()).await {
|
||||||
Ok(u) => u,
|
Ok(u) => u,
|
||||||
|
Err(sqlx::Error::RowNotFound) => {
|
||||||
|
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("failed to get user {}: {}", login.username.clone(), e);
|
error!("failed to get user {}: {}", login.username.clone(), e);
|
||||||
|
|
||||||
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
|
return Err(ErrorResponse::reply("database error")
|
||||||
|
.with_status(StatusCode::INTERNAL_SERVER_ERROR));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let session = match db.get_user_session(&user).await {
|
let session = match db.get_user_session(&user).await {
|
||||||
Ok(u) => u,
|
Ok(u) => u,
|
||||||
Err(e) => {
|
Err(sqlx::Error::RowNotFound) => {
|
||||||
error!("failed to get session for {}: {}", login.username, e);
|
debug!("user session not found for user id={}", user.id);
|
||||||
|
|
||||||
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
|
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
|
||||||
}
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("database error for user {}: {}", login.username, err);
|
||||||
|
return Err(ErrorResponse::reply("database error")
|
||||||
|
.with_status(StatusCode::INTERNAL_SERVER_ERROR));
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let verified = verify_str(user.password.as_str(), login.password.borrow());
|
let verified = verify_str(user.password.as_str(), login.password.borrow());
|
||||||
|
|
Loading…
Reference in a new issue