tirea_agentos_server/transport/
transcoder.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use futures::StreamExt;
5use tirea_contract::Transcoder;
6use tokio::sync::Mutex;
7
8use crate::transport::{BoxStream, Endpoint, TransportError};
9
10/// Recv-only protocol transcoder endpoint.
11///
12/// Wraps an inner `Endpoint<R::Input, SendMsg>` and presents
13/// `Endpoint<R::Output, SendMsg>`:
14///
15/// - **recv** (output direction): stateful stream transformation via
16///   `R: Transcoder` — emits prologue, transcodes each item, then emits epilogue.
17/// - **send** (input direction): passes through to the inner endpoint unchanged.
18pub struct TranscoderEndpoint<R, SendMsg>
19where
20    R: Transcoder,
21    SendMsg: Send + 'static,
22{
23    inner: Arc<dyn Endpoint<R::Input, SendMsg>>,
24    recv_transcoder: Mutex<Option<R>>,
25}
26
27impl<R, SendMsg> TranscoderEndpoint<R, SendMsg>
28where
29    R: Transcoder + 'static,
30    SendMsg: Send + 'static,
31{
32    pub fn new(inner: Arc<dyn Endpoint<R::Input, SendMsg>>, recv_transcoder: R) -> Self {
33        Self {
34            inner,
35            recv_transcoder: Mutex::new(Some(recv_transcoder)),
36        }
37    }
38}
39
40#[async_trait]
41impl<R, SendMsg> Endpoint<R::Output, SendMsg> for TranscoderEndpoint<R, SendMsg>
42where
43    R: Transcoder + 'static,
44    SendMsg: Send + 'static,
45{
46    async fn recv(&self) -> Result<BoxStream<R::Output>, TransportError> {
47        let recv_transcoder = self
48            .recv_transcoder
49            .lock()
50            .await
51            .take()
52            .ok_or(TransportError::Closed)?;
53        let inner_stream = self.inner.recv().await?;
54
55        let stream = async_stream::stream! {
56            let mut transcoder = recv_transcoder;
57            let mut inner = inner_stream;
58
59            for event in transcoder.prologue() {
60                yield Ok(event);
61            }
62
63            while let Some(item) = inner.next().await {
64                match item {
65                    Ok(input) => {
66                        for event in transcoder.transcode(&input) {
67                            yield Ok(event);
68                        }
69                    }
70                    Err(e) => {
71                        yield Err(e);
72                        return;
73                    }
74                }
75            }
76
77            for event in transcoder.epilogue() {
78                yield Ok(event);
79            }
80        };
81
82        Ok(Box::pin(stream))
83    }
84
85    async fn send(&self, item: SendMsg) -> Result<(), TransportError> {
86        self.inner.send(item).await
87    }
88
89    async fn close(&self) -> Result<(), TransportError> {
90        self.inner.close().await
91    }
92}