Add history deletion (#791)

* Drop events. I'd still like to do them, but differently

* Start adding delete api stuff

* Set mailmap

* Delete delete delete

* Fix tests

* Make clippy happy
This commit is contained in:
Ellie Huxtable 2023-03-20 09:26:54 +00:00 committed by GitHub
parent edcd477153
commit dcd77749dd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 249 additions and 154 deletions

View file

@ -1 +1,2 @@
networkException <git@nwex.de> <github@nwex.de>
Violet Shreve <github@shreve.io> <jacob@shreve.io>

View file

@ -0,0 +1,2 @@
-- Add migration script here
drop table events;

View file

@ -0,0 +1,2 @@
-- Add migration script here
alter table history add column deleted_at integer;

View file

@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::collections::HashSet;
use chrono::Utc;
use eyre::{bail, Result};
@ -9,8 +10,8 @@ use reqwest::{
use sodiumoxide::crypto::secretbox;
use atuin_common::api::{
AddHistoryRequest, CountResponse, ErrorResponse, IndexResponse, LoginRequest, LoginResponse,
RegisterResponse, SyncHistoryResponse,
AddHistoryRequest, CountResponse, DeleteHistoryRequest, ErrorResponse, IndexResponse,
LoginRequest, LoginResponse, RegisterResponse, StatusResponse, SyncHistoryResponse,
};
use semver::Version;
@ -138,11 +139,27 @@ impl<'a> Client<'a> {
Ok(count.count)
}
pub async fn status(&self) -> Result<StatusResponse> {
let url = format!("{}/sync/status", self.sync_addr);
let url = Url::parse(url.as_str())?;
let resp = self.client.get(url).send().await?;
if resp.status() != StatusCode::OK {
bail!("failed to get status (are you logged in?)");
}
let status = resp.json::<StatusResponse>().await?;
Ok(status)
}
pub async fn get_history(
&self,
sync_ts: chrono::DateTime<Utc>,
history_ts: chrono::DateTime<Utc>,
host: Option<String>,
deleted: HashSet<String>,
) -> Result<Vec<History>> {
let host = match host {
None => hash_str(&format!("{}:{}", whoami::hostname(), whoami::username())),
@ -163,8 +180,17 @@ impl<'a> Client<'a> {
let history = history
.history
.iter()
// TODO: handle deletion earlier in this chain
.map(|h| serde_json::from_str(h).expect("invalid base64"))
.map(|h| decrypt(&h, &self.key).expect("failed to decrypt history! check your key"))
.map(|mut h| {
if deleted.contains(&h.id) {
h.deleted_at = Some(chrono::Utc::now());
h.command = String::from("");
}
h
})
.collect();
Ok(history)
@ -178,4 +204,17 @@ impl<'a> Client<'a> {
Ok(())
}
pub async fn delete_history(&self, h: History) -> Result<()> {
let url = format!("{}/history", self.sync_addr);
let url = Url::parse(url.as_str())?;
self.client
.delete(url)
.json(&DeleteHistoryRequest { client_id: h.id })
.send()
.await?;
Ok(())
}
}

View file

@ -14,7 +14,6 @@ use sqlx::{
};
use super::{
event::{Event, EventType},
history::History,
ordering,
settings::{FilterMode, SearchMode},
@ -62,13 +61,14 @@ pub trait Database: Send + Sync {
async fn update(&self, h: &History) -> Result<()>;
async fn history_count(&self) -> Result<i64>;
async fn event_count(&self) -> Result<i64>;
async fn merge_events(&self) -> Result<i64>;
async fn first(&self) -> Result<History>;
async fn last(&self) -> Result<History>;
async fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>>;
async fn delete(&self, mut h: History) -> Result<()>;
async fn deleted(&self) -> Result<Vec<History>>;
// Yes I know, it's a lot.
// Could maybe break it down to a searchparams struct or smth but that feels a little... pointless.
// Been debating maybe a DSL for search? eg "before:time limit:1 the query"
@ -126,31 +126,10 @@ impl Sqlite {
Ok(())
}
async fn save_event(tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, e: &Event) -> Result<()> {
let event_type = match e.event_type {
EventType::Create => "create",
EventType::Delete => "delete",
};
sqlx::query(
"insert or ignore into events(id, timestamp, hostname, event_type, history_id)
values(?1, ?2, ?3, ?4, ?5)",
)
.bind(e.id.as_str())
.bind(e.timestamp.timestamp_nanos())
.bind(e.hostname.as_str())
.bind(event_type)
.bind(e.history_id.as_str())
.execute(tx)
.await?;
Ok(())
}
async fn save_raw(tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, h: &History) -> Result<()> {
sqlx::query(
"insert or ignore into history(id, timestamp, duration, exit, command, cwd, session, hostname)
values(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
"insert or ignore into history(id, timestamp, duration, exit, command, cwd, session, hostname, deleted_at)
values(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
)
.bind(h.id.as_str())
.bind(h.timestamp.timestamp_nanos())
@ -160,6 +139,7 @@ impl Sqlite {
.bind(h.cwd.as_str())
.bind(h.session.as_str())
.bind(h.hostname.as_str())
.bind(h.deleted_at.map(|t|t.timestamp_nanos()))
.execute(tx)
.await?;
@ -167,6 +147,8 @@ impl Sqlite {
}
fn query_history(row: SqliteRow) -> History {
let deleted_at: Option<i64> = row.get("deleted_at");
History {
id: row.get("id"),
timestamp: Utc.timestamp_nanos(row.get("timestamp")),
@ -176,6 +158,7 @@ impl Sqlite {
cwd: row.get("cwd"),
session: row.get("session"),
hostname: row.get("hostname"),
deleted_at: deleted_at.map(|t| Utc.timestamp_nanos(t)),
}
}
}
@ -184,11 +167,8 @@ impl Sqlite {
impl Database for Sqlite {
async fn save(&mut self, h: &History) -> Result<()> {
debug!("saving history to sqlite");
let event = Event::new_create(h);
let mut tx = self.pool.begin().await?;
Self::save_raw(&mut tx, h).await?;
Self::save_event(&mut tx, &event).await?;
tx.commit().await?;
Ok(())
@ -200,9 +180,7 @@ impl Database for Sqlite {
let mut tx = self.pool.begin().await?;
for i in h {
let event = Event::new_create(i);
Self::save_raw(&mut tx, i).await?;
Self::save_event(&mut tx, &event).await?;
}
tx.commit().await?;
@ -227,7 +205,7 @@ impl Database for Sqlite {
sqlx::query(
"update history
set timestamp = ?2, duration = ?3, exit = ?4, command = ?5, cwd = ?6, session = ?7, hostname = ?8
set timestamp = ?2, duration = ?3, exit = ?4, command = ?5, cwd = ?6, session = ?7, hostname = ?8, deleted_at = ?9
where id = ?1",
)
.bind(h.id.as_str())
@ -238,6 +216,7 @@ impl Database for Sqlite {
.bind(h.cwd.as_str())
.bind(h.session.as_str())
.bind(h.hostname.as_str())
.bind(h.deleted_at.map(|t|t.timestamp_nanos()))
.execute(&self.pool)
.await?;
@ -338,49 +317,15 @@ impl Database for Sqlite {
Ok(res)
}
async fn event_count(&self) -> Result<i64> {
let res: i64 = sqlx::query_scalar("select count(1) from events")
.fetch_one(&self.pool)
async fn deleted(&self) -> Result<Vec<History>> {
let res = sqlx::query("select * from history where deleted_at is not null")
.map(Self::query_history)
.fetch_all(&self.pool)
.await?;
Ok(res)
}
// Ensure that we have correctly merged the event log
async fn merge_events(&self) -> Result<i64> {
// Ensure that we do not have more history locally than we do events.
// We can think of history as the merged log of events. There should never be more history than
// events, and the only time this could happen is if someone is upgrading from an old Atuin version
// from before we stored events.
let history_count = self.history_count().await?;
let event_count = self.event_count().await?;
if history_count > event_count {
// pass an empty context, because with global listing we don't care
let no_context = Context {
cwd: String::from(""),
session: String::from(""),
hostname: String::from(""),
};
// We're just gonna load everything into memory here. That sucks, I know, sorry.
// But also even if you have a LOT of history that should be fine, and we're only going to be doing this once EVER.
let all_the_history = self
.list(FilterMode::Global, &no_context, None, false)
.await?;
let mut tx = self.pool.begin().await?;
for i in all_the_history.iter() {
// A CREATE for every single history item is to be expected.
let event = Event::new_create(i);
Self::save_event(&mut tx, &event).await?;
}
tx.commit().await?;
}
Ok(0)
}
async fn history_count(&self) -> Result<i64> {
let res: (i64,) = sqlx::query_as("select count(1) from history")
.fetch_one(&self.pool)
@ -528,6 +473,18 @@ impl Database for Sqlite {
Ok(res)
}
// deleted_at doesn't mean the actual time that the user deleted it,
// but the time that the system marks it as deleted
async fn delete(&self, mut h: History) -> Result<()> {
let now = chrono::Utc::now();
h.command = String::from(""); // blank it
h.deleted_at = Some(now); // delete it
self.update(&h).await?; // save it
Ok(())
}
}
#[cfg(test)]
@ -585,6 +542,7 @@ mod test {
1,
Some("beep boop".to_string()),
Some("booop".to_string()),
None,
);
db.save(&history).await
}

View file

@ -124,6 +124,7 @@ mod test {
1,
Some("beep boop".to_string()),
Some("booop".to_string()),
None,
);
let e1 = encrypt(&history, &key1).unwrap();

View file

@ -1,47 +0,0 @@
use chrono::Utc;
use serde::{Deserialize, Serialize};
use crate::history::History;
use atuin_common::utils::uuid_v4;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum EventType {
Create,
Delete,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow)]
pub struct Event {
pub id: String,
pub timestamp: chrono::DateTime<Utc>,
pub hostname: String,
pub event_type: EventType,
pub history_id: String,
}
impl Event {
pub fn new_create(history: &History) -> Event {
Event {
id: uuid_v4(),
timestamp: history.timestamp,
hostname: history.hostname.clone(),
event_type: EventType::Create,
history_id: history.id.clone(),
}
}
pub fn new_delete(history_id: &str) -> Event {
let hostname = format!("{}:{}", whoami::hostname(), whoami::username());
Event {
id: uuid_v4(),
timestamp: chrono::Utc::now(),
hostname,
event_type: EventType::Create,
history_id: history_id.to_string(),
}
}
}

View file

@ -16,9 +16,11 @@ pub struct History {
pub cwd: String,
pub session: String,
pub hostname: String,
pub deleted_at: Option<chrono::DateTime<Utc>>,
}
impl History {
#[allow(clippy::too_many_arguments)]
pub fn new(
timestamp: chrono::DateTime<Utc>,
command: String,
@ -27,6 +29,7 @@ impl History {
duration: i64,
session: Option<String>,
hostname: Option<String>,
deleted_at: Option<chrono::DateTime<Utc>>,
) -> Self {
let session = session
.or_else(|| env::var("ATUIN_SESSION").ok())
@ -43,6 +46,7 @@ impl History {
duration,
session,
hostname,
deleted_at,
}
}

View file

@ -78,6 +78,7 @@ impl Importer for Bash {
-1,
None,
None,
None,
);
h.push(entry).await?;
next_timestamp += timestamp_increment;

View file

@ -80,6 +80,7 @@ impl Importer for Fish {
-1,
None,
None,
None,
))
.await?;
}
@ -115,6 +116,7 @@ impl Importer for Fish {
-1,
None,
None,
None,
))
.await?;
}

View file

@ -131,6 +131,7 @@ impl Importer for Resh {
cwd: entry.pwd,
session: uuid_v4(),
hostname: entry.host,
deleted_at: None,
})
.await?;
}

View file

@ -86,6 +86,7 @@ impl Importer for Zsh {
-1,
None,
None,
None,
))
.await?;
}
@ -119,6 +120,7 @@ fn parse_extended(line: &str, counter: i64) -> History {
duration,
None,
None,
None,
)
}

View file

@ -80,6 +80,7 @@ impl From<HistDbEntry> for History {
.trim_end()
.to_string(),
),
None,
)
}
}

View file

@ -11,7 +11,6 @@ pub mod encryption;
pub mod sync;
pub mod database;
pub mod event;
pub mod history;
pub mod import;
pub mod ordering;

View file

@ -1,4 +1,6 @@
use std::collections::HashSet;
use std::convert::TryInto;
use std::iter::FromIterator;
use chrono::prelude::*;
use eyre::Result;
@ -37,7 +39,11 @@ async fn sync_download(
) -> Result<(i64, i64)> {
debug!("starting sync download");
let remote_count = client.count().await?;
let remote_status = client.status().await?;
let remote_count = remote_status.count;
// useful to ensure we don't even save something that hasn't yet been synced + deleted
let remote_deleted = HashSet::from_iter(remote_status.deleted.clone());
let initial_local = db.history_count().await?;
let mut local_count = initial_local;
@ -54,7 +60,12 @@ async fn sync_download(
while remote_count > local_count {
let page = client
.get_history(last_sync, last_timestamp, host.clone())
.get_history(
last_sync,
last_timestamp,
host.clone(),
remote_deleted.clone(),
)
.await?;
db.save_bulk(&page).await?;
@ -81,6 +92,13 @@ async fn sync_download(
}
}
for i in remote_status.deleted {
// we will update the stored history to have this data
// pretty much everything can be nullified
let h = db.load(i.as_str()).await?;
db.delete(h).await?;
}
Ok((local_count - initial_local, local_count))
}
@ -136,12 +154,17 @@ async fn sync_upload(
debug!("upload cursor: {:?}", cursor);
}
let deleted = db.deleted().await?;
for i in deleted {
info!("deleting {} on remote", i.id);
client.delete_history(i).await?;
}
Ok(())
}
pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> {
db.merge_events().await?;
let client = api_client::Client::new(
&settings.sync_address,
&settings.session_token,

View file

@ -66,31 +66,18 @@ pub struct IndexResponse {
pub version: String,
}
// Doubled up with the history sync stuff, because atm we need to support BOTH.
// People are still running old clients, and in some cases _very_ old clients.
#[derive(Debug, Serialize, Deserialize)]
pub enum AddEventRequest {
Create(AddHistoryRequest),
Delete {
id: String,
timestamp: chrono::DateTime<Utc>,
hostname: chrono::DateTime<Utc>,
// When we delete a history item, we push up an event marking its client
// id as being deleted.
history_id: String,
},
pub struct StatusResponse {
pub count: i64,
pub deleted: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SyncEventRequest {
pub sync_ts: chrono::DateTime<chrono::FixedOffset>,
pub event_ts: chrono::DateTime<chrono::FixedOffset>,
pub host: String,
pub struct DeleteHistoryRequest {
pub client_id: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SyncEventResponse {
pub events: Vec<String>,
pub struct MessageResponse {
pub message: String,
}

View file

@ -0,0 +1,2 @@
-- Add migration script here
drop table events;

View file

@ -0,0 +1,5 @@
-- Add migration script here
alter table history add column if not exists deleted_at timestamp;
-- queries will all be selecting the ids of history for a user, that has been deleted
create index if not exists history_deleted_index on history(client_id, user_id, deleted_at);

View file

@ -4,6 +4,9 @@ use async_trait::async_trait;
use chrono::{Datelike, TimeZone};
use chronoutil::RelativeDuration;
use sqlx::{postgres::PgPoolOptions, Result};
use sqlx::Row;
use tracing::{debug, instrument, warn};
use super::{
@ -28,6 +31,9 @@ pub trait Database {
async fn count_history(&self, user: &User) -> Result<i64>;
async fn count_history_cached(&self, user: &User) -> Result<i64>;
async fn delete_history(&self, user: &User, id: String) -> Result<()>;
async fn deleted_history(&self, user: &User) -> Result<Vec<String>>;
async fn count_history_range(
&self,
user: &User,
@ -141,6 +147,46 @@ impl Database for Postgres {
Ok(res.0 as i64)
}
async fn delete_history(&self, user: &User, id: String) -> Result<()> {
sqlx::query(
"update history
set deleted_at = $3
where user_id = $1
and client_id = $2
and deleted_at is null", // don't just keep setting it
)
.bind(user.id)
.bind(id)
.bind(chrono::Utc::now().naive_utc())
.fetch_all(&self.pool)
.await?;
Ok(())
}
#[instrument(skip_all)]
async fn deleted_history(&self, user: &User) -> Result<Vec<String>> {
// The cache is new, and the user might not yet have a cache value.
// They will have one as soon as they post up some new history, but handle that
// edge case.
let res = sqlx::query(
"select client_id from history
where user_id = $1
and deleted_at is not null",
)
.bind(user.id)
.fetch_all(&self.pool)
.await?;
let res = res
.iter()
.map(|row| row.get::<String, _>("client_id"))
.collect();
Ok(res)
}
#[instrument(skip_all)]
async fn count_history_range(
&self,

View file

@ -74,6 +74,28 @@ pub async fn list<DB: Database>(
Ok(Json(SyncHistoryResponse { history }))
}
#[instrument(skip_all, fields(user.id = user.id))]
pub async fn delete<DB: Database>(
user: User,
state: State<AppState<DB>>,
Json(req): Json<DeleteHistoryRequest>,
) -> Result<Json<MessageResponse>, ErrorResponseStatus<'static>> {
let db = &state.0.database;
// user_id is the ID of the history, as set by the user (the server has its own ID)
let deleted = db.delete_history(&user, req.client_id).await;
if let Err(e) = deleted {
error!("failed to delete history: {}", e);
return Err(ErrorResponse::reply("failed to delete history")
.with_status(StatusCode::INTERNAL_SERVER_ERROR));
}
Ok(Json(MessageResponse {
message: String::from("deleted OK"),
}))
}
#[instrument(skip_all, fields(user.id = user.id))]
pub async fn add<DB: Database>(
user: User,

View file

@ -2,6 +2,7 @@ use atuin_common::api::{ErrorResponse, IndexResponse};
use axum::{response::IntoResponse, Json};
pub mod history;
pub mod status;
pub mod user;
const VERSION: &str = env!("CARGO_PKG_VERSION");

View file

@ -0,0 +1,29 @@
use axum::{extract::State, Json};
use http::StatusCode;
use tracing::instrument;
use super::{ErrorResponse, ErrorResponseStatus, RespExt};
use crate::{database::Database, models::User, router::AppState};
use atuin_common::api::*;
#[instrument(skip_all, fields(user.id = user.id))]
pub async fn status<DB: Database>(
user: User,
state: State<AppState<DB>>,
) -> Result<Json<StatusResponse>, ErrorResponseStatus<'static>> {
let db = &state.0.database;
let history_count = db.count_history_cached(&user).await;
let deleted = db.deleted_history(&user).await;
if history_count.is_err() || deleted.is_err() {
return Err(ErrorResponse::reply("failed to query history count")
.with_status(StatusCode::INTERNAL_SERVER_ERROR));
}
Ok(Json(StatusResponse {
count: history_count.unwrap(),
deleted: deleted.unwrap(),
}))
}

View file

@ -2,7 +2,7 @@ use async_trait::async_trait;
use axum::{
extract::FromRequestParts,
response::IntoResponse,
routing::{get, post},
routing::{delete, get, post},
Router,
};
use eyre::Result;
@ -68,7 +68,9 @@ pub fn router<DB: Database + Clone + Send + Sync + 'static>(
.route("/sync/count", get(handlers::history::count))
.route("/sync/history", get(handlers::history::list))
.route("/sync/calendar/:focus", get(handlers::history::calendar))
.route("/sync/status", get(handlers::status::status))
.route("/history", post(handlers::history::add))
.route("/history", delete(handlers::history::delete))
.route("/user/:username", get(handlers::user::get))
.route("/register", post(handlers::user::register))
.route("/login", post(handlers::user::login));

View file

@ -184,7 +184,7 @@ impl Cmd {
// store whatever is ran, than to throw an error to the terminal
let cwd = utils::get_current_dir();
let h = History::new(chrono::Utc::now(), command, cwd, -1, -1, None, None);
let h = History::new(chrono::Utc::now(), command, cwd, -1, -1, None, None, None);
// print the ID
// we use this as the key for calling end

View file

@ -76,6 +76,10 @@ pub struct Cmd {
#[arg(long)]
cmd_only: bool,
// Delete anything matching this query. Will not print out the match
#[arg(long)]
delete: bool,
/// Available variables: {command}, {directory}, {duration}, {user}, {host} and {time}.
/// Example: --format "{time} - [{duration}] - {directory}$\t{command}"
#[arg(long, short)]
@ -100,12 +104,10 @@ impl Cmd {
let list_mode = ListMode::from_flags(self.human, self.cmd_only);
let entries = run_non_interactive(
settings,
list_mode,
self.cwd,
self.exit,
self.exclude_exit,
self.exclude_cwd,
self.format,
self.before,
self.after,
self.limit,
@ -113,9 +115,22 @@ impl Cmd {
db,
)
.await?;
if entries == 0 {
if entries.is_empty() {
std::process::exit(1)
}
// if we aren't deleting, print it all
if self.delete {
// delete it
// it only took me _years_ to add this
// sorry
for entry in entries {
db.delete(entry).await?;
}
} else {
super::history::print_list(&entries, list_mode, self.format.as_deref());
}
};
Ok(())
}
@ -126,18 +141,16 @@ impl Cmd {
#[allow(clippy::too_many_arguments)]
async fn run_non_interactive(
settings: &Settings,
list_mode: ListMode,
cwd: Option<String>,
exit: Option<i64>,
exclude_exit: Option<i64>,
exclude_cwd: Option<String>,
format: Option<String>,
before: Option<String>,
after: Option<String>,
limit: Option<i64>,
query: &[String],
db: &mut impl Database,
) -> Result<usize> {
) -> Result<Vec<History>> {
let dir = if cwd.as_deref() == Some(".") {
Some(utils::get_current_dir())
} else {
@ -202,6 +215,5 @@ async fn run_non_interactive(
.map(std::borrow::ToOwned::to_owned)
.collect();
super::history::print_list(&results, list_mode, format.as_deref());
Ok(results.len())
Ok(results)
}