2023-03-20 03:26:54 -06:00
|
|
|
use std::collections::HashSet;
|
2021-04-13 12:14:07 -06:00
|
|
|
use std::convert::TryInto;
|
2023-03-20 03:26:54 -06:00
|
|
|
use std::iter::FromIterator;
|
2021-04-13 12:14:07 -06:00
|
|
|
|
|
|
|
use chrono::prelude::*;
|
|
|
|
use eyre::Result;
|
|
|
|
|
2022-04-25 00:13:30 -06:00
|
|
|
use atuin_common::api::AddHistoryRequest;
|
2023-08-18 14:45:29 -06:00
|
|
|
use crypto_secretbox::Key;
|
2021-04-20 14:53:07 -06:00
|
|
|
|
2022-04-28 11:53:59 -06:00
|
|
|
use crate::{
|
|
|
|
api_client,
|
|
|
|
database::Database,
|
2023-06-21 01:45:23 -06:00
|
|
|
encryption::{decrypt, encrypt, load_key},
|
2023-05-21 09:21:51 -06:00
|
|
|
settings::Settings,
|
2022-04-28 11:53:59 -06:00
|
|
|
};
|
2021-04-13 12:14:07 -06:00
|
|
|
|
2022-04-25 00:13:30 -06:00
|
|
|
pub fn hash_str(string: &str) -> String {
|
|
|
|
use sha2::{Digest, Sha256};
|
|
|
|
let mut hasher = Sha256::new();
|
|
|
|
hasher.update(string.as_bytes());
|
|
|
|
hex::encode(hasher.finalize())
|
|
|
|
}
|
|
|
|
|
2021-04-13 12:14:07 -06:00
|
|
|
// Currently sync is kinda naive, and basically just pages backwards through
|
|
|
|
// history. This means newly added stuff shows up properly! We also just use
|
|
|
|
// the total count in each database to indicate whether a sync is needed.
|
|
|
|
// I think this could be massively improved! If we had a way of easily
|
|
|
|
// indicating count per time period (hour, day, week, year, etc) then we can
|
|
|
|
// easily pinpoint where we are missing data and what needs downloading. Start
|
|
|
|
// with year, then find the week, then the day, then the hour, then download it
|
|
|
|
// all! The current naive approach will do for now.
|
|
|
|
|
|
|
|
// Check if remote has things we don't, and if so, download them.
|
|
|
|
// Returns (num downloaded, total local)
|
2021-04-20 10:07:11 -06:00
|
|
|
async fn sync_download(
|
2023-06-21 01:45:23 -06:00
|
|
|
key: &Key,
|
2021-04-13 12:14:07 -06:00
|
|
|
force: bool,
|
2021-04-20 10:07:11 -06:00
|
|
|
client: &api_client::Client<'_>,
|
|
|
|
db: &mut (impl Database + Send),
|
2021-04-13 12:14:07 -06:00
|
|
|
) -> Result<(i64, i64)> {
|
2021-04-21 11:13:51 -06:00
|
|
|
debug!("starting sync download");
|
|
|
|
|
2023-03-20 03:26:54 -06:00
|
|
|
let remote_status = client.status().await?;
|
|
|
|
let remote_count = remote_status.count;
|
|
|
|
|
|
|
|
// useful to ensure we don't even save something that hasn't yet been synced + deleted
|
2023-06-21 01:45:23 -06:00
|
|
|
let remote_deleted =
|
|
|
|
HashSet::<&str>::from_iter(remote_status.deleted.iter().map(String::as_str));
|
2021-04-13 12:14:07 -06:00
|
|
|
|
2021-04-25 11:21:52 -06:00
|
|
|
let initial_local = db.history_count().await?;
|
2021-04-13 12:14:07 -06:00
|
|
|
let mut local_count = initial_local;
|
|
|
|
|
|
|
|
let mut last_sync = if force {
|
|
|
|
Utc.timestamp_millis(0)
|
|
|
|
} else {
|
2021-04-20 14:53:07 -06:00
|
|
|
Settings::last_sync()?
|
2021-04-13 12:14:07 -06:00
|
|
|
};
|
|
|
|
|
|
|
|
let mut last_timestamp = Utc.timestamp_millis(0);
|
|
|
|
|
|
|
|
let host = if force { Some(String::from("")) } else { None };
|
|
|
|
|
|
|
|
while remote_count > local_count {
|
2021-04-20 10:07:11 -06:00
|
|
|
let page = client
|
2023-06-21 01:45:23 -06:00
|
|
|
.get_history(last_sync, last_timestamp, host.clone())
|
2021-04-20 10:07:11 -06:00
|
|
|
.await?;
|
2021-04-13 12:14:07 -06:00
|
|
|
|
2023-06-21 01:45:23 -06:00
|
|
|
let history: Vec<_> = page
|
|
|
|
.history
|
|
|
|
.iter()
|
|
|
|
// TODO: handle deletion earlier in this chain
|
|
|
|
.map(|h| serde_json::from_str(h).expect("invalid base64"))
|
|
|
|
.map(|h| decrypt(h, key).expect("failed to decrypt history! check your key"))
|
|
|
|
.map(|mut h| {
|
|
|
|
if remote_deleted.contains(h.id.as_str()) {
|
|
|
|
h.deleted_at = Some(chrono::Utc::now());
|
|
|
|
h.command = String::from("");
|
|
|
|
}
|
|
|
|
|
|
|
|
h
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
db.save_bulk(&history).await?;
|
2021-04-13 12:14:07 -06:00
|
|
|
|
2021-04-25 11:21:52 -06:00
|
|
|
local_count = db.history_count().await?;
|
2021-04-13 12:14:07 -06:00
|
|
|
|
2023-06-21 01:45:23 -06:00
|
|
|
if history.len() < remote_status.page_size.try_into().unwrap() {
|
2021-04-21 11:13:51 -06:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2023-06-21 01:45:23 -06:00
|
|
|
let page_last = history
|
2021-04-13 12:14:07 -06:00
|
|
|
.last()
|
|
|
|
.expect("could not get last element of page")
|
|
|
|
.timestamp;
|
|
|
|
|
|
|
|
// in the case of a small sync frequency, it's possible for history to
|
|
|
|
// be "lost" between syncs. In this case we need to rewind the sync
|
|
|
|
// timestamps
|
|
|
|
if page_last == last_timestamp {
|
|
|
|
last_timestamp = Utc.timestamp_millis(0);
|
2022-09-13 13:03:52 -06:00
|
|
|
last_sync -= chrono::Duration::hours(1);
|
2021-04-13 12:14:07 -06:00
|
|
|
} else {
|
|
|
|
last_timestamp = page_last;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-03-20 03:26:54 -06:00
|
|
|
for i in remote_status.deleted {
|
|
|
|
// we will update the stored history to have this data
|
|
|
|
// pretty much everything can be nullified
|
2023-04-05 02:23:09 -06:00
|
|
|
if let Ok(h) = db.load(i.as_str()).await {
|
|
|
|
db.delete(h).await?;
|
|
|
|
} else {
|
|
|
|
info!(
|
|
|
|
"could not delete history with id {}, not found locally",
|
|
|
|
i.as_str()
|
|
|
|
);
|
|
|
|
}
|
2023-03-20 03:26:54 -06:00
|
|
|
}
|
|
|
|
|
2021-04-13 12:14:07 -06:00
|
|
|
Ok((local_count - initial_local, local_count))
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check if we have things remote doesn't, and if so, upload them
|
2021-04-20 10:07:11 -06:00
|
|
|
async fn sync_upload(
|
2023-06-21 01:45:23 -06:00
|
|
|
key: &Key,
|
2021-04-13 12:14:07 -06:00
|
|
|
_force: bool,
|
2021-04-20 10:07:11 -06:00
|
|
|
client: &api_client::Client<'_>,
|
|
|
|
db: &mut (impl Database + Send),
|
2021-04-13 12:14:07 -06:00
|
|
|
) -> Result<()> {
|
2021-04-21 11:13:51 -06:00
|
|
|
debug!("starting sync upload");
|
|
|
|
|
2023-03-24 03:04:57 -06:00
|
|
|
let remote_status = client.status().await?;
|
|
|
|
let remote_deleted: HashSet<String> = HashSet::from_iter(remote_status.deleted.clone());
|
|
|
|
|
2021-04-20 10:07:11 -06:00
|
|
|
let initial_remote_count = client.count().await?;
|
2021-04-13 12:14:07 -06:00
|
|
|
let mut remote_count = initial_remote_count;
|
|
|
|
|
2021-04-25 11:21:52 -06:00
|
|
|
let local_count = db.history_count().await?;
|
2021-04-13 12:14:07 -06:00
|
|
|
|
2021-04-21 11:13:51 -06:00
|
|
|
debug!("remote has {}, we have {}", remote_count, local_count);
|
|
|
|
|
2021-04-13 12:14:07 -06:00
|
|
|
// first just try the most recent set
|
|
|
|
let mut cursor = Utc::now();
|
|
|
|
|
|
|
|
while local_count > remote_count {
|
2023-05-21 09:21:51 -06:00
|
|
|
let last = db.before(cursor, remote_status.page_size).await?;
|
2021-05-09 14:17:24 -06:00
|
|
|
let mut buffer = Vec::new();
|
2021-04-13 12:14:07 -06:00
|
|
|
|
|
|
|
if last.is_empty() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
for i in last {
|
2023-06-21 01:45:23 -06:00
|
|
|
let data = encrypt(&i, key)?;
|
2021-05-09 15:31:11 -06:00
|
|
|
let data = serde_json::to_string(&data)?;
|
|
|
|
|
2021-04-13 12:14:07 -06:00
|
|
|
let add_hist = AddHistoryRequest {
|
2022-04-12 16:06:19 -06:00
|
|
|
id: i.id,
|
2021-04-13 12:14:07 -06:00
|
|
|
timestamp: i.timestamp,
|
|
|
|
data,
|
2022-04-12 16:06:19 -06:00
|
|
|
hostname: hash_str(&i.hostname),
|
2021-04-13 12:14:07 -06:00
|
|
|
};
|
|
|
|
|
|
|
|
buffer.push(add_hist);
|
|
|
|
}
|
|
|
|
|
|
|
|
// anything left over outside of the 100 block size
|
2021-04-20 10:07:11 -06:00
|
|
|
client.post_history(&buffer).await?;
|
2021-04-13 12:14:07 -06:00
|
|
|
cursor = buffer.last().unwrap().timestamp;
|
2021-04-20 10:07:11 -06:00
|
|
|
remote_count = client.count().await?;
|
2021-04-26 04:50:31 -06:00
|
|
|
|
|
|
|
debug!("upload cursor: {:?}", cursor);
|
2021-04-13 12:14:07 -06:00
|
|
|
}
|
|
|
|
|
2023-03-20 03:26:54 -06:00
|
|
|
let deleted = db.deleted().await?;
|
|
|
|
|
|
|
|
for i in deleted {
|
2023-03-24 03:04:57 -06:00
|
|
|
if remote_deleted.contains(&i.id) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2023-03-20 03:26:54 -06:00
|
|
|
info!("deleting {} on remote", i.id);
|
|
|
|
client.delete_history(i).await?;
|
|
|
|
}
|
|
|
|
|
2021-04-13 12:14:07 -06:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-04-20 10:07:11 -06:00
|
|
|
pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> {
|
2023-06-21 01:45:23 -06:00
|
|
|
let client = api_client::Client::new(&settings.sync_address, &settings.session_token)?;
|
|
|
|
|
|
|
|
let key = load_key(settings)?; // encryption key
|
2021-04-13 12:14:07 -06:00
|
|
|
|
2023-06-21 01:45:23 -06:00
|
|
|
sync_upload(&key, force, &client, db).await?;
|
2021-04-13 12:14:07 -06:00
|
|
|
|
2023-06-21 01:45:23 -06:00
|
|
|
let download = sync_download(&key, force, &client, db).await?;
|
2021-04-13 12:14:07 -06:00
|
|
|
|
|
|
|
debug!("sync downloaded {}", download.0);
|
|
|
|
|
2021-04-20 14:53:07 -06:00
|
|
|
Settings::save_sync_time()?;
|
2021-04-13 12:14:07 -06:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|