Prompts
Quick integration
Paste this prompt to generate minimal Python functions for submitting a batch and retrieving results — useful for prototyping or one-off scripts.Quick integration prompt
Write Python functions to submit and retrieve results from the ScaleDown Batch API.
API details:
Submit batch — POST https://api.scaledown.xyz/v1/batches
- Auth: HTTP header `x-api-key: <your key>`
- Content-Type: application/jsonl
- Body: newline-delimited JSON (JSONL), one item per line. Each line:
{
"custom_id": "<string you assign>",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "summarize" | "extract" | "classify",
// model-specific fields below
}
}
model="summarize" body fields:
{ "text": "<text>", "instructions": "<optional>", "max_tokens": 20048 }
model="extract" body fields:
{
"text": "<text>",
"entities": { "<label>": "<description>", ... },
"instruction": "<optional>"
}
model="classify" body fields:
{
"text": "<text>",
"labels": [{ "name": "<label>", "rubric": "<yes/no question>" }],
"system_prompt": "<optional>"
}
- Success response (JSON):
{ "batch_id": "<id>", "status": "queued", "total_count": <n> }
Poll status — GET https://api.scaledown.xyz/v1/batches/{batch_id}
- Returns: { "batch_id": "...", "status": "queued|processing|completed|failed", ... }
- Poll until status is "completed" or "failed"
Get output — GET https://api.scaledown.xyz/v1/batches/{batch_id}/output
- Returns JSONL. Each line:
{
"custom_id": "<your id>",
"response": {
"status_code": 200,
"body": {
"model": "summarize" | "extract" | "classify",
"choices": [{ "message": { "content": "<JSON string>" } }]
}
},
"error": null
}
Parse choices[0].message.content as JSON to get the domain result:
summarize → { "summary": "...", "input_chars": n, "output_chars": n }
extract → { "entities": [{ "type": "...", "text": "...", "confidence": 0.99 }] }
classify → { "top_label": "...", "scores": { "<label>": float }, "reasoning": "..." }
Requirements:
- Implement submit_batch(items: list[dict], api_key: str) -> str that sends the JSONL
and returns the batch_id.
- Implement poll_until_done(batch_id: str, api_key: str, interval: int = 5) -> dict
that polls every `interval` seconds until status is "completed" or "failed",
then returns the final status dict.
- Implement get_results(batch_id: str, api_key: str) -> list[dict] that fetches output
and returns a list of parsed result dicts, each with keys: custom_id, model, content, error.
- Raise a ValueError with status code and body on any non-2xx response.
Production-ready
Paste this prompt to generate a fully typed Python service class with error handling, retries, and environment-variable-based configuration.Production-ready prompt
Write a production-quality Python module for integrating the ScaleDown Batch API.
API details:
Submit batch — POST https://api.scaledown.xyz/v1/batches
- Auth: HTTP header `x-api-key: <your key>`
- Content-Type: application/jsonl
- Body: newline-delimited JSON. Each line is a BatchItem:
{
"custom_id": "<string>",
"method": "POST",
"url": "/v1/chat/completions",
"body": { "model": "summarize"|"extract"|"classify", ...model fields... }
}
model="summarize" → body fields: text (str), instructions (str, optional), max_tokens (int, default 20048)
model="extract" → body fields: text (str), entities (dict[str, str]), instruction (str, optional)
model="classify" → body fields: text (str), labels (list[{"name": str, "rubric": str}]), system_prompt (str, optional)
- Response: { "batch_id": str, "status": "queued", "total_count": int }
Poll — GET https://api.scaledown.xyz/v1/batches/{batch_id}
- Response: { "batch_id": str, "status": "queued"|"processing"|"completed"|"failed", ... }
Output — GET https://api.scaledown.xyz/v1/batches/{batch_id}/output
- Response: JSONL. Each line:
{
"custom_id": str,
"response": {
"status_code": int,
"body": { "model": str, "choices": [{ "message": { "content": "<JSON string>" } }] }
},
"error": { "code": str, "message": str } | null
}
Deserialize choices[0].message.content as JSON:
summarize → { "summary": str, "input_chars": int, "output_chars": int }
extract → { "entities": [{ "type": str, "text": str, "confidence": float, "start": int, "end": int }] }
classify → { "top_label": str, "scores": dict[str, float], "reasoning": str, "labels": list[dict] }
Apply these programming principles:
1. Environment configuration — Load the API key from SCALEDOWN_API_KEY. Raise a clear
ValueError at construction time if it is missing.
2. Typed inputs — Define dataclasses:
SummarizeItem(custom_id: str, text: str, instructions: str | None, max_tokens: int)
ExtractItem(custom_id: str, text: str, entities: dict[str, str], instruction: str | None)
ClassifyItem(custom_id: str, text: str, labels: list[dict], system_prompt: str | None)
Define a union type: BatchInputItem = SummarizeItem | ExtractItem | ClassifyItem
3. Typed results — Define dataclasses:
BatchItemResult(custom_id: str, model: str, content: dict, error: dict | None)
BatchResult(batch_id: str, items: list[BatchItemResult])
4. Custom exception — ScaleDownBatchError(status_code: int, message: str).
5. Client class — ScaleDownBatchClient with methods:
submit(items: list[BatchInputItem]) -> str (returns batch_id)
wait(batch_id: str, poll_interval: float = 5.0) -> dict (polls until done, returns status dict)
results(batch_id: str) -> BatchResult (fetches and parses output JSONL)
run(items: list[BatchInputItem], poll_interval: float = 5.0) -> BatchResult
(submit + wait + results in one call)
6. Retry with exponential backoff — On HTTP 5xx from submit or output fetch, retry up to
3 times with delays 2s, 4s, 8s. Raise ScaleDownBatchError after exhausting retries.
7. Type annotations — Full type annotations on all methods and fields.
Async pipeline with callbacks
Paste this prompt to generate an async batch pipeline that fans out results to per-model handler callbacks.Async pipeline prompt
Write an async Python module that submits a ScaleDown batch, polls for completion,
and fans out each completed item to a model-specific callback.
ScaleDown Batch API:
- Submit: POST https://api.scaledown.xyz/v1/batches
Headers: x-api-key, content-type: application/jsonl
Body: JSONL with one item per line:
{ "custom_id": str, "method": "POST", "url": "/v1/chat/completions",
"body": { "model": "summarize"|"extract"|"classify", ...fields... } }
Response: { "batch_id": str, "status": "queued", "total_count": int }
- Poll: GET https://api.scaledown.xyz/v1/batches/{batch_id}
Response: { "status": "queued"|"processing"|"completed"|"failed", ... }
- Output: GET https://api.scaledown.xyz/v1/batches/{batch_id}/output
Response: JSONL. Each line: { "custom_id": str, "response": { "body": { "model": str,
"choices": [{ "message": { "content": "<JSON string>" } }] } }, "error": null|{...} }
Parse choices[0].message.content as JSON for the domain result.
Requirements:
1. Accept a list of batch item dicts and a dict of callbacks:
callbacks: dict[str, Callable[[str, dict], Awaitable[None]]]
Keys are "summarize", "extract", "classify". Each callback receives (custom_id, content_dict).
2. Implement async run_batch(items, callbacks, api_key, poll_interval=5.0) using httpx.AsyncClient.
- POST the JSONL to submit the batch.
- Poll every poll_interval seconds using asyncio.sleep until status is "completed" or "failed".
- If status is "failed", raise a RuntimeError with the batch_id.
- Fetch output JSONL and for each line: parse content, look up the callback by model,
and await it with (custom_id, content_dict). Skip items where error is not null.
3. Run all callbacks concurrently using asyncio.gather once output is fetched.
4. Add a __main__ block demonstrating the API with one summarize, one extract, and one classify item,
printing the custom_id and a short preview of the result in each callback.
What the production prompt generates
The production-ready prompt instructs the AI to apply seven programming principles. Here is an example of the code it produces:Principles encoded in the production-ready prompt: environment-variable config, typed input dataclasses, typed result dataclasses, custom exception class, single-responsibility client with
run() convenience method, retry with exponential backoff, full type annotations.Show Example output from the production-ready prompt
Show Example output from the production-ready prompt
import json
import os
import time
import requests
from dataclasses import dataclass, field
from typing import Any
# ---------------------------------------------------------------------------
# Input types
# ---------------------------------------------------------------------------
@dataclass
class SummarizeItem:
custom_id: str
text: str
instructions: str | None = None
max_tokens: int = 20048
@dataclass
class ExtractItem:
custom_id: str
text: str
entities: dict[str, str] = field(default_factory=dict)
instruction: str | None = None
@dataclass
class ClassifyItem:
custom_id: str
text: str
labels: list[dict[str, str]] = field(default_factory=list)
system_prompt: str | None = None
BatchInputItem = SummarizeItem | ExtractItem | ClassifyItem
# ---------------------------------------------------------------------------
# Result types
# ---------------------------------------------------------------------------
@dataclass
class BatchItemResult:
custom_id: str
model: str
content: dict[str, Any]
error: dict[str, Any] | None
@dataclass
class BatchResult:
batch_id: str
items: list[BatchItemResult]
# ---------------------------------------------------------------------------
# Exception
# ---------------------------------------------------------------------------
class ScaleDownBatchError(Exception):
def __init__(self, status_code: int, message: str) -> None:
super().__init__(f"ScaleDown Batch API error {status_code}: {message}")
self.status_code = status_code
self.message = message
# ---------------------------------------------------------------------------
# Client
# ---------------------------------------------------------------------------
class ScaleDownBatchClient:
BASE_URL = "https://api.scaledown.xyz/v1/batches"
def __init__(self) -> None:
api_key = os.environ.get("SCALEDOWN_API_KEY")
if not api_key:
raise ValueError(
"SCALEDOWN_API_KEY environment variable is missing or empty. "
"Set it with: export SCALEDOWN_API_KEY=your_key_here"
)
self._session = requests.Session()
self._session.headers.update({"x-api-key": api_key})
def _item_to_dict(self, item: BatchInputItem) -> dict[str, Any]:
if isinstance(item, SummarizeItem):
body: dict[str, Any] = {"model": "summarize", "text": item.text, "max_tokens": item.max_tokens}
if item.instructions:
body["instructions"] = item.instructions
elif isinstance(item, ExtractItem):
body = {"model": "extract", "text": item.text, "entities": item.entities}
if item.instruction:
body["instruction"] = item.instruction
else: # ClassifyItem
body = {"model": "classify", "text": item.text, "labels": item.labels}
if item.system_prompt:
body["system_prompt"] = item.system_prompt
return {"custom_id": item.custom_id, "method": "POST", "url": "/v1/chat/completions", "body": body}
def submit(self, items: list[BatchInputItem]) -> str:
jsonl = "\n".join(json.dumps(self._item_to_dict(i)) for i in items).encode()
retry_delays = [2, 4, 8]
last_error: ScaleDownBatchError | None = None
for attempt in range(len(retry_delays) + 1):
if attempt > 0:
time.sleep(retry_delays[attempt - 1])
resp = self._session.post(self.BASE_URL, data=jsonl, headers={"content-type": "application/jsonl"})
if resp.ok:
return resp.json()["batch_id"]
if resp.status_code >= 500:
last_error = ScaleDownBatchError(resp.status_code, resp.text)
continue
raise ScaleDownBatchError(resp.status_code, resp.text)
raise last_error # type: ignore[misc]
def wait(self, batch_id: str, poll_interval: float = 5.0) -> dict[str, Any]:
while True:
resp = self._session.get(f"{self.BASE_URL}/{batch_id}")
if not resp.ok:
raise ScaleDownBatchError(resp.status_code, resp.text)
status = resp.json()
if status["status"] in ("completed", "failed"):
return status
time.sleep(poll_interval)
def results(self, batch_id: str) -> BatchResult:
retry_delays = [2, 4, 8]
last_error: ScaleDownBatchError | None = None
for attempt in range(len(retry_delays) + 1):
if attempt > 0:
time.sleep(retry_delays[attempt - 1])
resp = self._session.get(f"{self.BASE_URL}/{batch_id}/output")
if resp.ok:
break
if resp.status_code >= 500:
last_error = ScaleDownBatchError(resp.status_code, resp.text)
continue
raise ScaleDownBatchError(resp.status_code, resp.text)
else:
raise last_error # type: ignore[misc]
items: list[BatchItemResult] = []
for line in resp.text.splitlines():
if not line.strip():
continue
row = json.loads(line)
error = row.get("error")
if error:
items.append(BatchItemResult(custom_id=row["custom_id"], model="", content={}, error=error))
continue
body = row["response"]["body"]
content = json.loads(body["choices"][0]["message"]["content"])
items.append(BatchItemResult(custom_id=row["custom_id"], model=body["model"], content=content, error=None))
return BatchResult(batch_id=batch_id, items=items)
def run(self, items: list[BatchInputItem], poll_interval: float = 5.0) -> BatchResult:
batch_id = self.submit(items)
self.wait(batch_id, poll_interval=poll_interval)
return self.results(batch_id)
import os
os.environ["SCALEDOWN_API_KEY"] = "your_key_here"
client = ScaleDownBatchClient()
result = client.run([
SummarizeItem(
custom_id="sum-1",
text="Artificial intelligence is transforming industries worldwide...",
instructions="Use bullet points.",
),
ExtractItem(
custom_id="ext-1",
text="Apple Inc. reported Q1 revenue of $119.6B. CEO Tim Cook made the announcement.",
entities={"company": "Name of the company", "revenue": "Revenue figure", "ceo": "CEO name"},
),
ClassifyItem(
custom_id="cls-1",
text="I was charged twice for my subscription this month.",
labels=[
{"name": "billing", "rubric": "Is this text about a billing issue or charge?"},
{"name": "technical", "rubric": "Is this text about a technical problem?"},
{"name": "general", "rubric": "Is this a general inquiry that does not fit another category?"},
],
),
])
for item in result.items:
if item.error:
print(f"[{item.custom_id}] ERROR: {item.error}")
elif item.model == "summarize":
print(f"[{item.custom_id}] Summary: {item.content['summary']}")
elif item.model == "extract":
for e in item.content["entities"]:
print(f"[{item.custom_id}] {e['type']}: {e['text']}")
elif item.model == "classify":
print(f"[{item.custom_id}] {item.content['top_label']} — {item.content['reasoning']}")