prololo: switch to tokio async channel

This commit is contained in:
Antoine Martin 2021-09-12 19:17:13 +02:00
parent 0837b07a9e
commit a402029dd2
4 changed files with 23 additions and 13 deletions

View file

@ -2,7 +2,6 @@ use std::{
fs::File, fs::File,
io::{BufReader, BufWriter}, io::{BufReader, BufWriter},
path::PathBuf, path::PathBuf,
sync::mpsc::Receiver,
}; };
use anyhow::Context; use anyhow::Context;
@ -11,6 +10,7 @@ use matrix_sdk::{
ruma::events::{room::member::MemberEventContent, StrippedStateEvent}, ruma::events::{room::member::MemberEventContent, StrippedStateEvent},
Client, ClientConfig, Session, SyncSettings, Client, ClientConfig, Session, SyncSettings,
}; };
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::{debug, info}; use tracing::{debug, info};
use crate::{config::ProloloConfig, webhooks::Event}; use crate::{config::ProloloConfig, webhooks::Event};
@ -62,21 +62,29 @@ impl Prololo {
/// ///
/// [`Prololo::init`] **must** be called before this function, otherwise the [`Client`] isn't /// [`Prololo::init`] **must** be called before this function, otherwise the [`Client`] isn't
/// logged in. /// logged in.
pub async fn run(&self, events: Receiver<Event>) { pub async fn run(&self, events: UnboundedReceiver<Event>) {
debug!("running..."); debug!("running...");
let client = self.client.clone(); let client = self.client.clone();
let config = self.config.clone(); let config = self.config.clone();
tokio::task::spawn_blocking(move || { tokio::task::spawn(async move { Self::receive_events(events, client, config).await });
Self::handle_events(events, client, config);
});
self.client.sync(SyncSettings::default()).await self.client.sync(SyncSettings::default()).await
} }
fn handle_events(events: Receiver<Event>, client: Client, config: ProloloConfig) { async fn receive_events(
mut events: UnboundedReceiver<Event>,
client: Client,
config: ProloloConfig,
) {
loop { loop {
let event = events.recv().unwrap(); let event = match events.recv().await {
Some(event) => event,
None => {
info!("all channel senders were dropped, exiting receive loop");
break;
}
};
debug!("received event: {:?}", event); debug!("received event: {:?}", event);
} }
} }

View file

@ -1,11 +1,11 @@
use std::fs::File; use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::mpsc::sync_channel;
use anyhow::Context; use anyhow::Context;
use clap::Clap; use clap::Clap;
use rocket::routes; use rocket::routes;
use tokio::sync::mpsc::unbounded_channel;
mod bot; mod bot;
use bot::Prololo; use bot::Prololo;
@ -34,7 +34,7 @@ async fn main() -> anyhow::Result<()> {
let config: ProloloConfig = serde_yaml::from_reader(BufReader::new(config_file)) let config: ProloloConfig = serde_yaml::from_reader(BufReader::new(config_file))
.context("couldn't parse config file")?; .context("couldn't parse config file")?;
let (sender, receiver) = sync_channel(42); let (sender, receiver) = unbounded_channel();
let prololo = Prololo::new(config).context("failed to create prololo bot")?; let prololo = Prololo::new(config).context("failed to create prololo bot")?;
prololo.init().await.context("failed to init prololo bot")?; prololo.init().await.context("failed to init prololo bot")?;

View file

@ -1,9 +1,8 @@
use std::sync::{mpsc::SyncSender};
mod github; mod github;
pub use github::{github_webhook, GitHubEvent}; pub use github::{github_webhook, GitHubEvent};
use tokio::sync::mpsc::UnboundedSender;
pub struct EventSender(pub SyncSender<Event>); pub struct EventSender(pub UnboundedSender<Event>);
#[derive(Debug)] #[derive(Debug)]
pub enum Event { pub enum Event {

View file

@ -39,7 +39,10 @@ pub fn github_webhook(
} }
}; };
sender.0.send(Event::GitHub(event)).unwrap(); sender
.0
.send(Event::GitHub(event))
.expect("mpsc channel was closed / dropped");
Status::Ok Status::Ok
} }