SSE 실시간 동기화 구현기
FastAPI 서버에 SSE를 붙이고, 데스크톱과 모바일 클라이언트를 연결하기까지 부딪힌 문제들.
SSE 실시간 동기화 구현기
이전 글에서 모바일과 데스크톱 사이의 실시간 동기화를 SSE로 하겠다고 결정했습니다. 이 글은 그걸 실제로 만들면서 부딪힌 문제들에 대한 기록입니다.
서버: FastAPI + 인메모리 pub/sub
서버는 Python FastAPI입니다. SSE는 StreamingResponse로 구현할 수 있습니다. generator 함수가 yield하면 클라이언트에 이벤트가 전달됩니다.
핵심은 EventManager입니다. 유저별로 구독자(asyncio.Queue)를 관리합니다. 누군가 태스크를 수정하면 해당 유저의 모든 구독자 Queue에 이벤트를 넣고, 각 SSE 스트림의 generator가 Queue에서 꺼내서 yield합니다.
class EventManager:
def subscribe(self, achiever_id: str) -> Subscriber:
# Queue를 만들어서 구독자 set에 추가
async def publish(self, achiever_id, event_type, data, exclude_connection_id):
# 해당 유저의 모든 구독자 Queue에 이벤트 전달
간단한 구조입니다. Redis나 메시지 큐 없이 인메모리로 동작합니다. 서버가 한 대니까 이걸로 충분합니다.
본인 기기 제외
첫 번째 문제. 모바일에서 태스크를 완료하면, 모바일 자신에게도 이벤트가 갑니다. 이미 UI에 반영했는데 또 fetch하게 됩니다.
해결: SSE 연결 시 서버가 고유한 connection_id를 발급합니다. 클라이언트는 이후 모든 REST 요청에 X-Connection-Id 헤더를 실어 보냅니다. 서버는 이벤트를 발행할 때 해당 connection을 제외합니다.
SSE 연결 → 서버: "당신의 connection_id는 abc123"
REST 요청 → 클라이언트: "X-Connection-Id: abc123"
서버 publish → abc123 제외, 나머지에게 전달
모든 mutation 엔드포인트(약 100개)에 이 로직을 추가했습니다.
데스크톱: fetch streaming
데스크톱은 Tauri(웹 기반)이니 브라우저의 EventSource API를 쓰면 될 것 같았지만, EventSource는 커스텀 헤더를 지원하지 않습니다. Bearer 토큰을 보낼 수가 없습니다.
그래서 fetch + ReadableStream으로 직접 구현했습니다. response body를 reader로 읽으면서 \n\n으로 이벤트를 파싱합니다.
Vite 프록시 버퍼링
개발 환경에서 SSE가 안 됐습니다. 서버에서 이벤트를 보내는데 클라이언트가 못 받습니다.
원인은 Vite 개발 서버의 HTTP 프록시였습니다. 프록시가 SSE 응답을 버퍼링하고 있었습니다. SSE 경로에 대해 별도 프록시 설정을 추가하고 버퍼링을 비활성화해야 했습니다.
"/api/achiever/events/stream": {
target: "http://localhost:8001",
configure: (proxy) => {
proxy.on("proxyRes", (proxyRes) => {
proxyRes.headers["x-accel-buffering"] = "no";
});
},
},
프로덕션에서는 이 문제가 없습니다. Vite 프록시를 안 거치니까요.
모바일: fetch는 안 된다
React Native에서 fetch의 streaming(response.body.getReader())을 시도했습니다. 연결은 되는데 데이터를 못 받습니다. fetch 응답이 완료될 때까지 body를 읽을 수 없었습니다. SSE는 응답이 끝나지 않는 스트림인데, React Native의 fetch는 그걸 처리하지 못했습니다.
해결: XMLHttpRequest로 교체했습니다. XHR의 onprogress 이벤트는 데이터가 도착할 때마다 호출됩니다. responseText에서 마지막으로 읽은 위치 이후의 새 데이터만 잘라서 파싱합니다.
xhr.onprogress = () => {
const newData = xhr.responseText.substring(this.lastIndex);
this.lastIndex = xhr.responseText.length;
// newData를 \n\n으로 분리해서 이벤트 파싱
};
이 방식은 안정적으로 동작합니다.
포그라운드에서만
모바일은 백그라운드로 가면 OS가 네트워크 연결을 끊을 수 있습니다. SSE를 상시 유지하면 배터리도 소모됩니다. 그래서 AppState를 감시해서 포그라운드일 때만 연결하고, 백그라운드로 가면 끊습니다.
기존의 syncAt 기반 증분 동기화가 있어서, 포그라운드로 돌아올 때 놓친 변경사항은 sync가 잡아줍니다.
연결 끊김과 재연결
SSE 연결은 언제든 끊길 수 있습니다. 서버 재시작, 네트워크 변경, 클라이언트 슬립.
AbortController로 연결을 관리하고, 끊기면 3초 후 자동 재연결합니다. 서버 쪽에서는 5초마다 클라이언트 disconnect를 체크해서 죽은 구독자를 정리합니다.
구독자 누수
디버깅 중 발견한 문제. 서버의 구독자 수가 계속 늘어났습니다. 클라이언트가 끊겼는데 서버에서 unsubscribe가 안 됐습니다.
원인은 두 가지였습니다. 서버의 disconnect 감지 간격이 30초로 너무 길었고, 클라이언트에서 disconnect() 시 진행 중인 fetch를 abort하지 않아서 연결이 좀비 상태로 남아 있었습니다.
디테일 패널 동기화
리스트는 갱신되는데 디테일 패널은 안 바뀌는 문제가 있었습니다. 디테일 컴포넌트가 로컬 state(title, description 등)를 따로 관리하고 있어서, prop이 바뀌어도 로컬 state는 그대로였습니다.
로컬 state를 리셋하는 effect의 의존성에 task.revision을 추가해서 해결했습니다. 서버에서 태스크가 수정될 때마다 revision이 올라가니까, 다른 기기에서 온 변경을 감지할 수 있습니다.
결과
모바일에서 태스크를 완료하면 데스크톱 리스트와 디테일이 즉시 갱신됩니다. 데스크톱에서 제목을 바꾸면 모바일에서도 곧바로 반영됩니다. 새로고침 없이.
인메모리 pub/sub, SSE 스트림, connection_id 기반 본인 제외. 복잡한 인프라 없이 기존 HTTP 위에서 실시간 동기화를 만들었습니다.