prompt = _build_prompt(incoming_text, files)
try:
async for message in query(prompt=prompt, options=options):
self._remember_session(thread_id, getattr(message, "session_id", None))
_handle_message_for_progress(
message=message,
progress_callback=progress_callback,
seen_tool_use_ids=seen_tool_use_ids,
seen_thinking_message_ids=seen_thinking_message_ids,
)
if isinstance(message, AssistantMessage):
_accumulate_assistant_usage(
message=message,
seen_message_ids=seen_assistant_message_ids,
totals=assistant_usage_totals,
)
text = _assistant_text(message)
if text:
assistant_text = text
continue
if isinstance(message, ResultMessage):
result_message = message
except Exception as exc:
...
def _build_prompt(
incoming_text: str,
files: list[IncomingFileEvent],
) -> AsyncIterator[dict[str, Any]]:
image_files = [f for f in files if f.download_url and _image_mime(f.file_name)]
content: list[dict[str, Any]] | str
if not image_files:
content = incoming_text
else:
parts: list[dict[str, Any]] = []
for f in image_files:
mime = _image_mime(f.file_name)
data = _download_image_bytes(f.download_url)
if data is None:
raise RuntimeError(f"Failed to download image: {f.file_name}")
parts.append({
"type": "image",
"source": {
"type": "base64",
"media_type": mime,
"data": base64.standard_b64encode(data).decode("ascii"),
},
})
parts.append({"type": "text", "text": incoming_text})
content = parts
async def _stream() -> AsyncIterator[dict[str, Any]]:
yield {
"type": "user",
"message": {"role": "user", "content": content},
"parent_tool_use_id": None,
"session_id": None,
}
return _stream()
just input short message but got:
Streaming is required for operations that may take longer than 10 minutes.
just input short message but got:
Streaming is required for operations that may take longer than 10 minutes.