tirea_agentos_server/transport/
transcoder.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use futures::StreamExt;
5use tirea_contract::Transcoder;
6use tokio::sync::Mutex;
7
8use crate::transport::{BoxStream, Endpoint, TransportError};
9
10pub struct TranscoderEndpoint<R, SendMsg>
19where
20 R: Transcoder,
21 SendMsg: Send + 'static,
22{
23 inner: Arc<dyn Endpoint<R::Input, SendMsg>>,
24 recv_transcoder: Mutex<Option<R>>,
25}
26
27impl<R, SendMsg> TranscoderEndpoint<R, SendMsg>
28where
29 R: Transcoder + 'static,
30 SendMsg: Send + 'static,
31{
32 pub fn new(inner: Arc<dyn Endpoint<R::Input, SendMsg>>, recv_transcoder: R) -> Self {
33 Self {
34 inner,
35 recv_transcoder: Mutex::new(Some(recv_transcoder)),
36 }
37 }
38}
39
40#[async_trait]
41impl<R, SendMsg> Endpoint<R::Output, SendMsg> for TranscoderEndpoint<R, SendMsg>
42where
43 R: Transcoder + 'static,
44 SendMsg: Send + 'static,
45{
46 async fn recv(&self) -> Result<BoxStream<R::Output>, TransportError> {
47 let recv_transcoder = self
48 .recv_transcoder
49 .lock()
50 .await
51 .take()
52 .ok_or(TransportError::Closed)?;
53 let inner_stream = self.inner.recv().await?;
54
55 let stream = async_stream::stream! {
56 let mut transcoder = recv_transcoder;
57 let mut inner = inner_stream;
58
59 for event in transcoder.prologue() {
60 yield Ok(event);
61 }
62
63 while let Some(item) = inner.next().await {
64 match item {
65 Ok(input) => {
66 for event in transcoder.transcode(&input) {
67 yield Ok(event);
68 }
69 }
70 Err(e) => {
71 yield Err(e);
72 return;
73 }
74 }
75 }
76
77 for event in transcoder.epilogue() {
78 yield Ok(event);
79 }
80 };
81
82 Ok(Box::pin(stream))
83 }
84
85 async fn send(&self, item: SendMsg) -> Result<(), TransportError> {
86 self.inner.send(item).await
87 }
88
89 async fn close(&self) -> Result<(), TransportError> {
90 self.inner.close().await
91 }
92}