Supporting Sqlite DB Pool and add json Message type

This commit is contained in:
Horhik 2021-04-23 21:54:48 +03:00
parent 7c28f99546
commit 35d2eafdb8
11 changed files with 145 additions and 67 deletions

1
Cargo.lock generated
View file

@ -639,6 +639,7 @@ dependencies = [
"futures-util", "futures-util",
"log", "log",
"ntru", "ntru",
"r2d2",
"r2d2_sqlite", "r2d2_sqlite",
"regex", "regex",
"rusqlite", "rusqlite",

View file

@ -26,6 +26,7 @@ chrono = {version = "0.4.19", features = ["serde"]}
uuid = {version = "0.8.2", features = ["serde" , "v4"]} uuid = {version = "0.8.2", features = ["serde" , "v4"]}
regex = "1.4.5" regex = "1.4.5"
r2d2_sqlite = "0.18.0" r2d2_sqlite = "0.18.0"
r2d2 = "0.8.9"
[dependencies.fcpv2] [dependencies.fcpv2]
path = "../FCPv2" path = "../FCPv2"

View file

@ -148,12 +148,12 @@ pub fn add_user(
name: String, name: String,
insert_key: String, insert_key: String,
sign_key: String, sign_key: String,
id: uuid::Uuid,
conn: &Connection, conn: &Connection,
server_sender: SP, server_sender: SP,
) -> Result<()> { ) -> Result<()> {
let new_id = Uuid::new_v4();
let user = db::types::User { let user = db::types::User {
id: db::types::Id(new_id.clone()), id: Id(id.clone()),
name: name.clone(), name: name.clone(),
sign_key: sign_key.clone(), sign_key: sign_key.clone(),
insert_key: SSK::parse(&insert_key[..]).unwrap(), insert_key: SSK::parse(&insert_key[..]).unwrap(),
@ -161,7 +161,7 @@ pub fn add_user(
my_messages_count: 0, my_messages_count: 0,
}; };
let user_jsoned = crate::api::response::User{ let user_jsoned = crate::api::response::User{
id: new_id.clone().to_string(), id: id.clone().to_string(),
name: name.clone(), name: name.clone(),
sign_key: sign_key.clone(), sign_key: sign_key.clone(),
insert_key: insert_key, insert_key: insert_key,

View file

@ -3,3 +3,4 @@ pub mod response;
pub mod handlers; pub mod handlers;
pub mod selector; pub mod selector;
pub mod identifier; pub mod identifier;
pub mod types;

View file

@ -28,6 +28,7 @@ pub enum Request {
name: String, name: String,
sign_key: String, sign_key: String,
insert_key: String, insert_key: String,
id: uuid::Uuid,
}, // CreateInstance TODO v0.3 }, // CreateInstance TODO v0.3
} }
@ -76,12 +77,13 @@ fn request_LoadMessages_are_correct() {
} }
#[test] #[test]
fn request_AddUser_are_correct() { fn request_AddUser_are_correct() {
let json = "{\"type\":\"addUser\",\"name\":\"john\",\"signKey\":\"USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE\",\"insertKey\":\"USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE\"}"; let json = "{\"type\":\"addUser\",\"name\":\"john\",\"signKey\":\"USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE\",\"insertKey\":\"USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE\",\"id\":\"2d12c305-eb79-489c-b643-f27f1e78a7c0\"\"}";
let parsed: Request = serde_json::from_str(json).unwrap(); let parsed: Request = serde_json::from_str(json).unwrap();
assert_eq!(parsed, Request::AddUser{ assert_eq!(parsed, Request::AddUser{
name: "john".to_string(), name: "john".to_string(),
sign_key: "USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE".to_string(), sign_key: "USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE".to_string(),
insert_key: "USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE".to_string(), insert_key: "USK@bxouok43eKpx3g4WmURjviGispWzYxeByiWRsmYOy5k,Y9j~lPDUoNlSTbZfDNaUajfePBrW~KM6uvHyOGWeA7Q,AQECAAE".to_string(),
id: uuid::Uuid::from_str("2d12c305-eb79-489c-b643-f27f1e78a7c0"),
}) })
} }

View file

@ -9,11 +9,14 @@ use rusqlite;
use rusqlite::Connection; use rusqlite::Connection;
use serde_json::from_str; use serde_json::from_str;
use serde_json::json; use serde_json::json;
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
// server_sender sending data to server thread; // server_sender sending data to server thread;
pub fn request_selector(json: &str, server_sender: SP, conn: &Connection) -> Result<()> { pub fn request_selector(json: &str, server_sender: SP, conn: Pool<SqliteConnectionManager>,) -> Result<()> {
// if let Ok(res) = from_str::<CreateInstanceReq>(&json) { // if let Ok(res) = from_str::<CreateInstanceReq>(&json) {
//TODO v0.3 Add Instances return Ok(()); //TODO v0.3 Add Instances return Ok(());
// } // }
let conn = conn.get().unwrap();
log::info!("matching request..."); log::info!("matching request...");
let parsed: Request = serde_json::from_str(json).unwrap(); let parsed: Request = serde_json::from_str(json).unwrap();
match parsed { match parsed {
@ -62,7 +65,8 @@ pub fn request_selector(json: &str, server_sender: SP, conn: &Connection) -> Res
name, name,
sign_key, sign_key,
insert_key, insert_key,
} => match handlers::add_user(name, insert_key, sign_key, &conn, server_sender.clone()) { id
} => match handlers::add_user(name, insert_key, sign_key, id, &conn, server_sender.clone()) {
Ok(_) => {} Ok(_) => {}
Err(_) => {} Err(_) => {}
}, },

View file

@ -1 +1,11 @@
pub use super::request_types; use crate::db::types::{Time, Id};
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Message {
pub message: String,
pub date: Time,
pub id: Id,
pub from_me: bool,
}

View file

@ -1,5 +1,4 @@
use crate::chat::types::PackedMessage; use crate::chat::types::PackedMessage;
use fcpv2::{types::traits::FcpParser, node::fcp_response::AllData};
use crate::db; use crate::db;
use async_std::{ use async_std::{
io, io,
@ -7,6 +6,7 @@ use async_std::{
task, task,
}; };
use async_tungstenite::{accept_async, tungstenite::Message, WebSocketStream}; use async_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
use fcpv2::{node::fcp_response::AllData, types::traits::FcpParser};
use futures::{ use futures::{
stream::{SplitSink, SplitStream}, stream::{SplitSink, SplitStream},
SinkExt, StreamExt, SinkExt, StreamExt,
@ -21,6 +21,8 @@ use super::stay_awake::request_repeater;
use super::types::{RP, SP}; use super::types::{RP, SP};
use crate::api::selector::request_selector; use crate::api::selector::request_selector;
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
struct FrontMsg { struct FrontMsg {
@ -33,7 +35,7 @@ struct FrontMsg {
pub fn listen_client( pub fn listen_client(
server_sender: SP, server_sender: SP,
client_receiver: RP, client_receiver: RP,
conn: rusqlite::Connection, conn: Pool<SqliteConnectionManager>,
) -> io::Result<()> { ) -> io::Result<()> {
task::block_on(connect_to_client(server_sender, client_receiver, conn)) task::block_on(connect_to_client(server_sender, client_receiver, conn))
} }
@ -41,7 +43,7 @@ pub fn listen_client(
async fn connect_to_client( async fn connect_to_client(
server_sender: SP, server_sender: SP,
client_receiver: RP, client_receiver: RP,
conn: rusqlite::Connection, conn: Pool<SqliteConnectionManager>,
) -> io::Result<()> { ) -> io::Result<()> {
let addr = env::args() let addr = env::args()
.nth(1) .nth(1)
@ -69,9 +71,14 @@ async fn connect_to_client(
sender, sender,
client_receiver, client_receiver,
client_repeater, client_repeater,
conn.clone(),
)); ));
let t2 = task::spawn(connection_for_sending(receiver, server_sender, conn)); let t2 = task::spawn(connection_for_sending(
let t3 = task::spawn(request_repeater(ss)); receiver,
server_sender,
conn.clone(),
));
let t3 = task::spawn(request_repeater(ss, conn.clone()));
t1.await?; t1.await?;
t3.await?; t3.await?;
t2.await?; t2.await?;
@ -84,9 +91,11 @@ async fn connection_for_receiving(
mut sender: SplitSink<WebSocketStream<TcpStream>, Message>, mut sender: SplitSink<WebSocketStream<TcpStream>, Message>,
client_receiver: RP, client_receiver: RP,
server_sender: SP, server_sender: SP,
conn: Pool<SqliteConnectionManager>,
) -> io::Result<()> { ) -> io::Result<()> {
log::info!("Connection for receiving launched"); log::info!("Connection for receiving launched");
// let mut prev: PackedMessage = PackedMessage::FromFreenet("nothing".to_string()); // let mut prev: PackedMessage = PackedMessage::FromFreenet("nothing".to_string());
let db = conn.get().unwrap();
while let Ok(res) = client_receiver.recv() { while let Ok(res) = client_receiver.recv() {
//TODO call client get after receiving NodeHello //TODO call client get after receiving NodeHello
// log::debug!("RES {:?}", &res); // log::debug!("RES {:?}", &res);
@ -107,11 +116,29 @@ async fn connection_for_receiving(
match res_type { match res_type {
Some("AllData") => { Some("AllData") => {
let data = AllData::parse(&r).unwrap(); let data = AllData::parse(&r).unwrap();
log::debug!("GOT mESSAGE {}\n FROM FREENET: {}",data.identifier, data.data ); log::debug!(
server_sender.send(PackedMessage::ToClient(data.data)); "GOT mESSAGE {}\n FROM FREENET: {}",
&data.identifier,
&data.data
);
server_sender.send(PackedMessage::ToClient(data.data.clone()));
//TOOD parse identifier //TOOD parse identifier
let (uuid, id) = crate::api::identifier::parse_message_identifier(&data.identifier); let (uuid, id) =
crate::api::identifier::parse_message_identifier(&data.identifier);
log::debug!("parsed identifier: {:?} {:?}", uuid, id); log::debug!("parsed identifier: {:?} {:?}", uuid, id);
let jsoned: crate::api::types::Message =
serde_json::from_str(&data.data[..]).unwrap();
crate::db::messages::add_message(
crate::db::types::Message {
id: id,
date: jsoned.date,
user_id: jsoned.id,
message: jsoned.message,
from_me: jsoned.from_me,
},
&db,
)
.unwrap()
/*async_std::task::block_on( /*async_std::task::block_on(
sender sender
// TODO freenet_response_handler // TODO freenet_response_handler
@ -135,7 +162,7 @@ async fn connection_for_receiving(
async fn connection_for_sending( async fn connection_for_sending(
mut receiver: SplitStream<WebSocketStream<TcpStream>>, mut receiver: SplitStream<WebSocketStream<TcpStream>>,
server_sender: SP, server_sender: SP,
conn: rusqlite::Connection, conn: Pool<SqliteConnectionManager>,
) -> io::Result<()> { ) -> io::Result<()> {
let ss = server_sender.clone(); let ss = server_sender.clone();
log::info!("Connection for sending launched"); log::info!("Connection for sending launched");
@ -144,7 +171,7 @@ async fn connection_for_sending(
if let Some(msg) = new_msg.await { if let Some(msg) = new_msg.await {
let jsoned = msg.expect("Falied to unwrap gotted message"); let jsoned = msg.expect("Falied to unwrap gotted message");
log::info!("new request"); log::info!("new request");
match request_selector(&jsoned.to_string()[..], server_sender.clone(), &conn) { match request_selector(&jsoned.to_string()[..], server_sender.clone(), conn.clone()) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
log::error!("{}", e); log::error!("{}", e);

View file

@ -6,11 +6,13 @@ use std::sync::mpsc::Sender;
use std::path::Path; use std::path::Path;
use std::fs::File; use std::fs::File;
use r2d2_sqlite::SqliteConnectionManager;
use r2d2::Pool;
type SP = Sender<PackedMessage>; type SP = Sender<PackedMessage>;
pub async fn request_repeater(ss: SP) -> io::Result<()> { pub async fn request_repeater(ss: SP, conn: Pool<SqliteConnectionManager>) -> io::Result<()> {
let db = crate::db::start_db().unwrap(); let db = conn.get().unwrap();
// loop { // loop {
//TODO create a field with tracked users //TODO create a field with tracked users
log::debug!("Request Repeater Started!"); log::debug!("Request Repeater Started!");

View file

@ -1,7 +1,16 @@
use rusqlite::{params, Connection, Result}; use rusqlite::{params, Connection, Result};
pub mod messages;
pub mod types; pub mod types;
pub mod users; pub mod users;
pub mod messages;
extern crate r2d2;
extern crate r2d2_sqlite;
extern crate rusqlite;
use core::marker::Send;
use r2d2_sqlite::SqliteConnectionManager;
pub use r2d2::{ManageConnection, Pool};
use types::DB_PATH; use types::DB_PATH;
@ -49,13 +58,33 @@ fn create_db(conn: &Connection) -> Result<()> {
Ok(()) Ok(())
} }
pub fn start_db() -> Result<Connection> { pub fn start_db() -> std::result::Result<Pool<SqliteConnectionManager>, r2d2::Error> where
{
let manager = r2d2_sqlite::SqliteConnectionManager::file(DB_PATH);
if !std::path::Path::new(DB_PATH).exists() {
let conn = Connection::open(DB_PATH).expect("failed to connect to db");
println!("{}", conn.is_autocommit());
create_db(&conn).expect("failed to create db");
Ok(r2d2::Pool::new(manager).unwrap())
} else {
Ok(r2d2::Pool::new(manager).unwrap())
}
}
pub fn old_start_db() -> Result<Connection> {
if !std::path::Path::new(DB_PATH).exists() { if !std::path::Path::new(DB_PATH).exists() {
let conn = Connection::open(DB_PATH)?; let conn = Connection::open(DB_PATH)?;
println!("{}", conn.is_autocommit()); println!("{}", conn.is_autocommit());
match create_db(&conn) { match create_db(&conn) {
Ok(_) => {log::info!("Successfully created DB!"); Ok(conn)}, Ok(_) => {
Err(e) => {log::error!("Failed to create DB: {:?}",e ); Err(e)}, log::info!("Successfully created DB!");
Ok(conn)
}
Err(e) => {
log::error!("Failed to create DB: {:?}", e);
Err(e)
}
} }
} else { } else {
Connection::open(DB_PATH) Connection::open(DB_PATH)
@ -63,8 +92,8 @@ pub fn start_db() -> Result<Connection> {
} }
// ### A little stupid test ### // // ### A little stupid test ### //
//let conn = db::start_db().unwrap(); //let conn = db::start_db().unwrap();
/* /*
users::add_user(db::types::User{ users::add_user(db::types::User{
id: 9349, id: 9349,
name: "Nick".to_string(), name: "Nick".to_string(),
@ -106,4 +135,4 @@ pub fn start_db() -> Result<Connection> {
.unwrap(); .unwrap();
let messages = db::messages::select_message_by_id(9349, 3, &conn).unwrap(); let messages = db::messages::select_message_by_id(9349, 3, &conn).unwrap();
*/ */

View file

@ -46,8 +46,9 @@ use std::{
fn main() -> io::Result<()> { fn main() -> io::Result<()> {
SimpleLogger::new().init().unwrap(); SimpleLogger::new().with_level(log::LevelFilter::Debug).init().unwrap();
//TODO Add connection pool for ruqlite
let db = db::start_db().unwrap(); let db = db::start_db().unwrap();
//let manager = r2d2_foodb::FooConnectionManager::new("localhost:1234"); //let manager = r2d2_foodb::FooConnectionManager::new("localhost:1234");