From e297b98f721bf32d8d4331677eefe49823db32b9 Mon Sep 17 00:00:00 2001 From: Ellie Huxtable Date: Fri, 4 Nov 2022 09:08:20 +0000 Subject: [PATCH] Add local event log storage (#390) * Add event data structures This adds the data structures required to start syncing events, rather than syncing history directly. Adjust event Fix Add event data structure to client * Add server event table sql * Add client event table migration Adjust migration * Insert into event table from client * Add event merge function Right now this just ensures we have the right amount of events given the history we have BUT it will also be used to merge CREATE/DELETE events, resulting in history being deleted :) * Make CI happy * Adjust * we don't limit history length any more * Update atuin-client/src/database.rs Co-authored-by: Conrad Ludgate * fix usage * Fix typo * New Rust, new clippy stuff Co-authored-by: Conrad Ludgate --- .../20220505083406_create-events.sql | 11 +++ atuin-client/src/database.rs | 73 ++++++++++++++++++- atuin-client/src/event.rs | 47 ++++++++++++ atuin-client/src/lib.rs | 1 + atuin-client/src/sync.rs | 2 + atuin-common/src/api.rs | 29 ++++++++ .../20220505082442_create-events.sql | 14 ++++ src/command/client/history.rs | 6 +- src/command/client/search/history_list.rs | 13 ++-- 9 files changed, 183 insertions(+), 13 deletions(-) create mode 100644 atuin-client/migrations/20220505083406_create-events.sql create mode 100644 atuin-client/src/event.rs create mode 100644 atuin-server/migrations/20220505082442_create-events.sql diff --git a/atuin-client/migrations/20220505083406_create-events.sql b/atuin-client/migrations/20220505083406_create-events.sql new file mode 100644 index 0000000..f6cafeb --- /dev/null +++ b/atuin-client/migrations/20220505083406_create-events.sql @@ -0,0 +1,11 @@ +create table if not exists events ( + id text primary key, + timestamp integer not null, + hostname text not null, + event_type text not null, + + history_id text not null +); + +-- Ensure there is only ever one of each event type per history item +create unique index history_event_idx ON events(event_type, history_id); diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs index ba28daf..b0d05db 100644 --- a/atuin-client/src/database.rs +++ b/atuin-client/src/database.rs @@ -13,6 +13,7 @@ use sqlx::{ }; use super::{ + event::{Event, EventType}, history::History, ordering, settings::{FilterMode, SearchMode}, @@ -61,6 +62,8 @@ pub trait Database: Send + Sync { async fn update(&self, h: &History) -> Result<()>; async fn history_count(&self) -> Result; + async fn event_count(&self) -> Result; + async fn merge_events(&self) -> Result; async fn first(&self) -> Result; async fn last(&self) -> Result; @@ -115,6 +118,27 @@ impl Sqlite { Ok(()) } + async fn save_event(tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, e: &Event) -> Result<()> { + let event_type = match e.event_type { + EventType::Create => "create", + EventType::Delete => "delete", + }; + + sqlx::query( + "insert or ignore into events(id, timestamp, hostname, event_type, history_id) + values(?1, ?2, ?3, ?4, ?5)", + ) + .bind(e.id.as_str()) + .bind(e.timestamp.timestamp_nanos()) + .bind(e.hostname.as_str()) + .bind(event_type) + .bind(e.history_id.as_str()) + .execute(tx) + .await?; + + Ok(()) + } + async fn save_raw(tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, h: &History) -> Result<()> { sqlx::query( "insert or ignore into history(id, timestamp, duration, exit, command, cwd, session, hostname) @@ -152,9 +176,11 @@ impl Sqlite { impl Database for Sqlite { async fn save(&mut self, h: &History) -> Result<()> { debug!("saving history to sqlite"); + let event = Event::new_create(h); let mut tx = self.pool.begin().await?; Self::save_raw(&mut tx, h).await?; + Self::save_event(&mut tx, &event).await?; tx.commit().await?; Ok(()) @@ -166,7 +192,9 @@ impl Database for Sqlite { let mut tx = self.pool.begin().await?; for i in h { - Self::save_raw(&mut tx, i).await? + let event = Event::new_create(i); + Self::save_raw(&mut tx, i).await?; + Self::save_event(&mut tx, &event).await?; } tx.commit().await?; @@ -302,6 +330,49 @@ impl Database for Sqlite { Ok(res) } + async fn event_count(&self) -> Result { + let res: i64 = sqlx::query_scalar("select count(1) from events") + .fetch_one(&self.pool) + .await?; + + Ok(res) + } + + // Ensure that we have correctly merged the event log + async fn merge_events(&self) -> Result { + // Ensure that we do not have more history locally than we do events. + // We can think of history as the merged log of events. There should never be more history than + // events, and the only time this could happen is if someone is upgrading from an old Atuin version + // from before we stored events. + let history_count = self.history_count().await?; + let event_count = self.event_count().await?; + + if history_count > event_count { + // pass an empty context, because with global listing we don't care + let no_context = Context { + cwd: String::from(""), + session: String::from(""), + hostname: String::from(""), + }; + + // We're just gonna load everything into memory here. That sucks, I know, sorry. + // But also even if you have a LOT of history that should be fine, and we're only going to be doing this once EVER. + let all_the_history = self + .list(FilterMode::Global, &no_context, None, false) + .await?; + + let mut tx = self.pool.begin().await?; + for i in all_the_history.iter() { + // A CREATE for every single history item is to be expected. + let event = Event::new_create(i); + Self::save_event(&mut tx, &event).await?; + } + tx.commit().await?; + } + + Ok(0) + } + async fn history_count(&self) -> Result { let res: (i64,) = sqlx::query_as("select count(1) from history") .fetch_one(&self.pool) diff --git a/atuin-client/src/event.rs b/atuin-client/src/event.rs new file mode 100644 index 0000000..4e76c07 --- /dev/null +++ b/atuin-client/src/event.rs @@ -0,0 +1,47 @@ +use chrono::Utc; +use serde::{Deserialize, Serialize}; + +use crate::history::History; +use atuin_common::utils::uuid_v4; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum EventType { + Create, + Delete, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow)] +pub struct Event { + pub id: String, + pub timestamp: chrono::DateTime, + pub hostname: String, + pub event_type: EventType, + + pub history_id: String, +} + +impl Event { + pub fn new_create(history: &History) -> Event { + Event { + id: uuid_v4(), + timestamp: history.timestamp, + hostname: history.hostname.clone(), + event_type: EventType::Create, + + history_id: history.id.clone(), + } + } + + pub fn new_delete(history_id: &str) -> Event { + let hostname = format!("{}:{}", whoami::hostname(), whoami::username()); + + Event { + id: uuid_v4(), + timestamp: chrono::Utc::now(), + hostname, + event_type: EventType::Create, + + history_id: history_id.to_string(), + } + } +} diff --git a/atuin-client/src/lib.rs b/atuin-client/src/lib.rs index 497c5e7..37d1b0f 100644 --- a/atuin-client/src/lib.rs +++ b/atuin-client/src/lib.rs @@ -11,6 +11,7 @@ pub mod encryption; pub mod sync; pub mod database; +pub mod event; pub mod history; pub mod import; pub mod ordering; diff --git a/atuin-client/src/sync.rs b/atuin-client/src/sync.rs index db1ba89..94ae24c 100644 --- a/atuin-client/src/sync.rs +++ b/atuin-client/src/sync.rs @@ -140,6 +140,8 @@ async fn sync_upload( } pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> { + db.merge_events().await?; + let client = api_client::Client::new( &settings.sync_address, &settings.session_token, diff --git a/atuin-common/src/api.rs b/atuin-common/src/api.rs index f17cfd5..f5d5daa 100644 --- a/atuin-common/src/api.rs +++ b/atuin-common/src/api.rs @@ -65,3 +65,32 @@ pub struct IndexResponse { pub homage: String, pub version: String, } + +// Doubled up with the history sync stuff, because atm we need to support BOTH. +// People are still running old clients, and in some cases _very_ old clients. +#[derive(Debug, Serialize, Deserialize)] +pub enum AddEventRequest { + Create(AddHistoryRequest), + + Delete { + id: String, + timestamp: chrono::DateTime, + hostname: chrono::DateTime, + + // When we delete a history item, we push up an event marking its client + // id as being deleted. + history_id: String, + }, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SyncEventRequest { + pub sync_ts: chrono::DateTime, + pub event_ts: chrono::DateTime, + pub host: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SyncEventResponse { + pub events: Vec, +} diff --git a/atuin-server/migrations/20220505082442_create-events.sql b/atuin-server/migrations/20220505082442_create-events.sql new file mode 100644 index 0000000..57e16ec --- /dev/null +++ b/atuin-server/migrations/20220505082442_create-events.sql @@ -0,0 +1,14 @@ +create type event_type as enum ('create', 'delete'); + +create table events ( + id bigserial primary key, + client_id text not null unique, -- the client-generated ID + user_id bigserial not null, -- allow multiple users + hostname text not null, -- a unique identifier from the client (can be hashed, random, whatever) + timestamp timestamp not null, -- one of the few non-encrypted metadatas + + event_type event_type, + data text not null, -- store the actual history data, encrypted. I don't wanna know! + + created_at timestamp not null default current_timestamp +); diff --git a/src/command/client/history.rs b/src/command/client/history.rs index 6a78adc..c45ee0c 100644 --- a/src/command/client/history.rs +++ b/src/command/client/history.rs @@ -136,10 +136,8 @@ impl Cmd { // It's better for atuin to silently fail here and attempt to // store whatever is ran, than to throw an error to the terminal - let cwd = match env::current_dir() { - Ok(dir) => dir.display().to_string(), - Err(_) => String::from(""), - }; + let cwd = env::current_dir() + .map_or_else(|_| String::new(), |dir| dir.display().to_string()); let h = History::new(chrono::Utc::now(), command, cwd, -1, -1, None, None); diff --git a/src/command/client/search/history_list.rs b/src/command/client/search/history_list.rs index e4d8ee6..fda1098 100644 --- a/src/command/client/search/history_list.rs +++ b/src/command/client/search/history_list.rs @@ -35,14 +35,11 @@ impl<'a> StatefulWidget for HistoryList<'a> { type State = ListState; fn render(mut self, area: Rect, buf: &mut Buffer, state: &mut Self::State) { - let list_area = match self.block.take() { - Some(b) => { - let inner_area = b.inner(area); - b.render(area, buf); - inner_area - } - None => area, - }; + let list_area = self.block.take().map_or(area, |b| { + let inner_area = b.inner(area); + b.render(area, buf); + inner_area + }); if list_area.width < 1 || list_area.height < 1 || self.history.is_empty() { return;