AI 서브태스크가 하나씩 나타나기까지
OpenAI 스트리밍 응답을 실시간으로 파싱하고, SSE로 클라이언트에 전달해서 서브태스크가 하나씩 생겨나는 경험을 만든 이야기.
AI 서브태스크가 하나씩 나타나기까지
Fecit에는 AI가 서브태스크를 자동으로 만들어주는 기능이 있습니다. 제목과 설명을 보고 3~5개의 하위 작업을 제안합니다.
잘 동작하지만, 한 가지 아쉬운 점이 있었습니다. AI가 생각하는 동안 별이 빙글빙글 돌고, 끝나면 서브태스크가 한꺼번에 뿅 나타납니다. 기다리는 시간은 보통 3~5초. 그 사이에 사용자는 아무것도 못 봅니다.
서브태스크가 하나씩 나타나면 어떨까 싶었습니다. AI가 만드는 족족 화면에 추가되는 거죠. 기다리는 느낌도 줄고, 뭔가 일어나고 있다는 걸 눈으로 볼 수 있으니까요.
기존 구조
원래 흐름은 단순했습니다.
- 클라이언트가 API를 호출합니다.
- 서버가 OpenAI에 요청을 보냅니다.
- OpenAI가 JSON 전체를 생성합니다.
- 서버가 JSON을 파싱해서 서브태스크를 한꺼번에 DB에 넣습니다.
- 클라이언트에 응답을 보냅니다.
OpenAI API는 이미 스트리밍으로 호출하고 있었지만, 토큰이 들어올 때마다 full_text에 이어붙이고, 스트림이 끝나면 json.loads(full_text)로 한 번에 파싱했습니다. 스트리밍의 이점을 전혀 쓰지 않고 있었던 겁니다.
스트리밍 중에 파싱하기
아이디어는 간단합니다. JSON이 완성되기를 기다리지 말고, 토큰이 들어올 때마다 “혹시 완성된 서브태스크 오브젝트가 있나?” 확인하는 겁니다.
AI가 생성하는 JSON은 이런 구조입니다.
{
"subTasks": [
{"title": "...", "description": "..."},
{"title": "...", "description": "..."}
]
}
스트리밍 중에는 이런 상태가 됩니다.
{"subTasks": [{"title": "주제 선정", "description": "블로그 주제를 정하고
아직 JSON이 아닙니다. json.loads를 호출하면 에러가 납니다. 닫는 괄호를 붙여서 파싱을 시도하는 방법도 있지만, 불안정합니다. description 중간에 따옴표나 중괄호가 있으면 깨집니다.
결국 brace depth를 추적하는 방식을 택했습니다. [ 이후에 {가 나오면 depth를 올리고, }가 나오면 내립니다. depth가 0으로 돌아오면 하나의 오브젝트가 완성된 겁니다. 문자열 안의 중괄호는 무시합니다.
def _extract_complete_objects(text: str) -> list:
bracket_pos = text.find('[')
if bracket_pos == -1:
return []
results = []
i = bracket_pos + 1
while i < len(text):
obj_start = text.find('{', i)
if obj_start == -1:
break
depth = 0
in_string = False
escape = False
j = obj_start
obj_end = -1
while j < len(text):
ch = text[j]
if escape:
escape = False
elif ch == '\\' and in_string:
escape = True
elif ch == '"' and not escape:
in_string = not in_string
elif not in_string:
if ch == '{':
depth += 1
elif ch == '}':
depth -= 1
if depth == 0:
obj_end = j
break
j += 1
if obj_end == -1:
break
try:
obj = json.loads(text[obj_start:obj_end + 1])
results.append(obj)
except Exception:
pass
i = obj_end + 1
return results
이 함수는 토큰이 들어올 때마다 호출됩니다. 이전에 이미 추출한 오브젝트 수를 기억해두고, 새로 완성된 것만 yield합니다.
하나 만들고, 바로 보내기
파싱된 서브태스크는 async generator로 yield됩니다.
async for sub_task_data in generate_sub_tasks_streaming(...):
# DB에 저장
await task_collection.insert_one(sub_task_doc)
# 부모의 sub_task_links 업데이트
await task_collection.update_one(...)
# SSE 이벤트 발행
publish_event(achiever_id, str(parent_task.id), "update")
await asyncio.sleep(0)
마지막 줄의 asyncio.sleep(0)이 중요합니다. publish_event는 asyncio.create_task()로 이벤트를 발행하는데, 이벤트 루프가 바쁘면 태스크가 실행되지 않습니다. sleep(0)으로 제어를 양보하면, 방금 스케줄된 SSE 태스크가 즉시 실행됩니다. 이 한 줄이 없으면 이벤트가 모여서 한꺼번에 전송됩니다. 한참을 헤맸습니다.
클라이언트는 이미 준비되어 있었다
데스크톱 앱은 이미 SSE로 실시간 동기화를 하고 있었습니다. task_record_updated 이벤트를 받으면 해당 태스크를 다시 fetch하고 화면에 반영합니다.
서버가 서브태스크를 하나 만들 때마다 부모 태스크의 sub_task_links가 갱신되고, SSE 이벤트가 발생합니다. 클라이언트는 부모를 다시 가져오고, subTaskLinks.length가 바뀌면 서브태스크 목록을 다시 조회합니다.
클라이언트 코드를 한 줄도 수정하지 않았습니다. 기존 SSE 인프라가 그대로 작동했습니다.
체감
AI가 서브태스크를 생성하면, 1~2초 간격으로 하나씩 목록에 추가됩니다. AI가 JSON 토큰을 생성하는 속도가 자연스러운 딜레이가 됩니다. 인위적인 setTimeout 같은 건 없습니다.
“로딩 중” 화면을 3초 동안 보는 것과, 서브태스크가 하나씩 나타나는 것. 기능은 같지만 느낌이 다릅니다. 뭔가 만들어지고 있다는 걸 보는 것만으로도 기다림의 무게가 달라집니다.
삽질 기록
순탄하지는 않았습니다.
처음에는 불완전한 JSON에 닫는 괄호를 붙여서 json.loads를 시도했습니다. '"]}', '""]}' 같은 suffix를 여러 개 준비해두고 하나씩 시도하는 방식. 당연히 안정적이지 않았습니다. description에 따옴표가 들어가면 바로 깨졌고, 결국 스트림이 끝난 후 최종 파싱에서만 서브태스크가 나왔습니다. 스트리밍의 의미가 없었습니다.
SSE 이벤트가 발행되는데 클라이언트에 도착하지 않는 문제도 있었습니다. asyncio.create_task()로 스케줄한 발행 태스크가 실행되지 않은 거였습니다. OpenAI 스트림을 소비하느라 이벤트 루프가 바빴던 겁니다. await asyncio.sleep(0) 한 줄로 해결됐지만, 찾는 데 시간이 걸렸습니다.