Add local event log storage (#390)

* Add event data structures

This adds the data structures required to start syncing events, rather
than syncing history directly.

Adjust event

Fix

Add event data structure to client

* Add server event table sql

* Add client event table migration

Adjust migration

* Insert into event table from client

* Add event merge function

Right now this just ensures we have the right amount of events given the
history we have

BUT it will also be used to merge CREATE/DELETE events, resulting in
history being deleted :)

* Make CI happy

* Adjust

* we don't limit history length any more

* Update atuin-client/src/database.rs

Co-authored-by: Conrad Ludgate <conradludgate@gmail.com>

* fix usage

* Fix typo

* New Rust, new clippy stuff

Co-authored-by: Conrad Ludgate <conradludgate@gmail.com>
This commit is contained in:
Ellie Huxtable 2022-11-04 09:08:20 +00:00 committed by GitHub
parent 6bd82abf6c
commit e297b98f72
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 183 additions and 13 deletions

View file

@ -0,0 +1,11 @@
create table if not exists events (
id text primary key,
timestamp integer not null,
hostname text not null,
event_type text not null,
history_id text not null
);
-- Ensure there is only ever one of each event type per history item
create unique index history_event_idx ON events(event_type, history_id);

View file

@ -13,6 +13,7 @@ use sqlx::{
}; };
use super::{ use super::{
event::{Event, EventType},
history::History, history::History,
ordering, ordering,
settings::{FilterMode, SearchMode}, settings::{FilterMode, SearchMode},
@ -61,6 +62,8 @@ pub trait Database: Send + Sync {
async fn update(&self, h: &History) -> Result<()>; async fn update(&self, h: &History) -> Result<()>;
async fn history_count(&self) -> Result<i64>; async fn history_count(&self) -> Result<i64>;
async fn event_count(&self) -> Result<i64>;
async fn merge_events(&self) -> Result<i64>;
async fn first(&self) -> Result<History>; async fn first(&self) -> Result<History>;
async fn last(&self) -> Result<History>; async fn last(&self) -> Result<History>;
@ -115,6 +118,27 @@ impl Sqlite {
Ok(()) Ok(())
} }
async fn save_event(tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, e: &Event) -> Result<()> {
let event_type = match e.event_type {
EventType::Create => "create",
EventType::Delete => "delete",
};
sqlx::query(
"insert or ignore into events(id, timestamp, hostname, event_type, history_id)
values(?1, ?2, ?3, ?4, ?5)",
)
.bind(e.id.as_str())
.bind(e.timestamp.timestamp_nanos())
.bind(e.hostname.as_str())
.bind(event_type)
.bind(e.history_id.as_str())
.execute(tx)
.await?;
Ok(())
}
async fn save_raw(tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, h: &History) -> Result<()> { async fn save_raw(tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, h: &History) -> Result<()> {
sqlx::query( sqlx::query(
"insert or ignore into history(id, timestamp, duration, exit, command, cwd, session, hostname) "insert or ignore into history(id, timestamp, duration, exit, command, cwd, session, hostname)
@ -152,9 +176,11 @@ impl Sqlite {
impl Database for Sqlite { impl Database for Sqlite {
async fn save(&mut self, h: &History) -> Result<()> { async fn save(&mut self, h: &History) -> Result<()> {
debug!("saving history to sqlite"); debug!("saving history to sqlite");
let event = Event::new_create(h);
let mut tx = self.pool.begin().await?; let mut tx = self.pool.begin().await?;
Self::save_raw(&mut tx, h).await?; Self::save_raw(&mut tx, h).await?;
Self::save_event(&mut tx, &event).await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@ -166,7 +192,9 @@ impl Database for Sqlite {
let mut tx = self.pool.begin().await?; let mut tx = self.pool.begin().await?;
for i in h { for i in h {
Self::save_raw(&mut tx, i).await? let event = Event::new_create(i);
Self::save_raw(&mut tx, i).await?;
Self::save_event(&mut tx, &event).await?;
} }
tx.commit().await?; tx.commit().await?;
@ -302,6 +330,49 @@ impl Database for Sqlite {
Ok(res) Ok(res)
} }
async fn event_count(&self) -> Result<i64> {
let res: i64 = sqlx::query_scalar("select count(1) from events")
.fetch_one(&self.pool)
.await?;
Ok(res)
}
// Ensure that we have correctly merged the event log
async fn merge_events(&self) -> Result<i64> {
// Ensure that we do not have more history locally than we do events.
// We can think of history as the merged log of events. There should never be more history than
// events, and the only time this could happen is if someone is upgrading from an old Atuin version
// from before we stored events.
let history_count = self.history_count().await?;
let event_count = self.event_count().await?;
if history_count > event_count {
// pass an empty context, because with global listing we don't care
let no_context = Context {
cwd: String::from(""),
session: String::from(""),
hostname: String::from(""),
};
// We're just gonna load everything into memory here. That sucks, I know, sorry.
// But also even if you have a LOT of history that should be fine, and we're only going to be doing this once EVER.
let all_the_history = self
.list(FilterMode::Global, &no_context, None, false)
.await?;
let mut tx = self.pool.begin().await?;
for i in all_the_history.iter() {
// A CREATE for every single history item is to be expected.
let event = Event::new_create(i);
Self::save_event(&mut tx, &event).await?;
}
tx.commit().await?;
}
Ok(0)
}
async fn history_count(&self) -> Result<i64> { async fn history_count(&self) -> Result<i64> {
let res: (i64,) = sqlx::query_as("select count(1) from history") let res: (i64,) = sqlx::query_as("select count(1) from history")
.fetch_one(&self.pool) .fetch_one(&self.pool)

47
atuin-client/src/event.rs Normal file
View file

@ -0,0 +1,47 @@
use chrono::Utc;
use serde::{Deserialize, Serialize};
use crate::history::History;
use atuin_common::utils::uuid_v4;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum EventType {
Create,
Delete,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow)]
pub struct Event {
pub id: String,
pub timestamp: chrono::DateTime<Utc>,
pub hostname: String,
pub event_type: EventType,
pub history_id: String,
}
impl Event {
pub fn new_create(history: &History) -> Event {
Event {
id: uuid_v4(),
timestamp: history.timestamp,
hostname: history.hostname.clone(),
event_type: EventType::Create,
history_id: history.id.clone(),
}
}
pub fn new_delete(history_id: &str) -> Event {
let hostname = format!("{}:{}", whoami::hostname(), whoami::username());
Event {
id: uuid_v4(),
timestamp: chrono::Utc::now(),
hostname,
event_type: EventType::Create,
history_id: history_id.to_string(),
}
}
}

View file

@ -11,6 +11,7 @@ pub mod encryption;
pub mod sync; pub mod sync;
pub mod database; pub mod database;
pub mod event;
pub mod history; pub mod history;
pub mod import; pub mod import;
pub mod ordering; pub mod ordering;

View file

@ -140,6 +140,8 @@ async fn sync_upload(
} }
pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> { pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> {
db.merge_events().await?;
let client = api_client::Client::new( let client = api_client::Client::new(
&settings.sync_address, &settings.sync_address,
&settings.session_token, &settings.session_token,

View file

@ -65,3 +65,32 @@ pub struct IndexResponse {
pub homage: String, pub homage: String,
pub version: String, pub version: String,
} }
// Doubled up with the history sync stuff, because atm we need to support BOTH.
// People are still running old clients, and in some cases _very_ old clients.
#[derive(Debug, Serialize, Deserialize)]
pub enum AddEventRequest {
Create(AddHistoryRequest),
Delete {
id: String,
timestamp: chrono::DateTime<Utc>,
hostname: chrono::DateTime<Utc>,
// When we delete a history item, we push up an event marking its client
// id as being deleted.
history_id: String,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SyncEventRequest {
pub sync_ts: chrono::DateTime<chrono::FixedOffset>,
pub event_ts: chrono::DateTime<chrono::FixedOffset>,
pub host: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SyncEventResponse {
pub events: Vec<String>,
}

View file

@ -0,0 +1,14 @@
create type event_type as enum ('create', 'delete');
create table events (
id bigserial primary key,
client_id text not null unique, -- the client-generated ID
user_id bigserial not null, -- allow multiple users
hostname text not null, -- a unique identifier from the client (can be hashed, random, whatever)
timestamp timestamp not null, -- one of the few non-encrypted metadatas
event_type event_type,
data text not null, -- store the actual history data, encrypted. I don't wanna know!
created_at timestamp not null default current_timestamp
);

View file

@ -136,10 +136,8 @@ impl Cmd {
// It's better for atuin to silently fail here and attempt to // It's better for atuin to silently fail here and attempt to
// store whatever is ran, than to throw an error to the terminal // store whatever is ran, than to throw an error to the terminal
let cwd = match env::current_dir() { let cwd = env::current_dir()
Ok(dir) => dir.display().to_string(), .map_or_else(|_| String::new(), |dir| dir.display().to_string());
Err(_) => String::from(""),
};
let h = History::new(chrono::Utc::now(), command, cwd, -1, -1, None, None); let h = History::new(chrono::Utc::now(), command, cwd, -1, -1, None, None);

View file

@ -35,14 +35,11 @@ impl<'a> StatefulWidget for HistoryList<'a> {
type State = ListState; type State = ListState;
fn render(mut self, area: Rect, buf: &mut Buffer, state: &mut Self::State) { fn render(mut self, area: Rect, buf: &mut Buffer, state: &mut Self::State) {
let list_area = match self.block.take() { let list_area = self.block.take().map_or(area, |b| {
Some(b) => {
let inner_area = b.inner(area); let inner_area = b.inner(area);
b.render(area, buf); b.render(area, buf);
inner_area inner_area
} });
None => area,
};
if list_area.width < 1 || list_area.height < 1 || self.history.is_empty() { if list_area.width < 1 || list_area.height < 1 || self.history.is_empty() {
return; return;