Currently the feature shape you request: API projects
You can use project as containers of a single product and a single API key. At least there, the ID is end-to-end transparent, can be named, and not some obfuscated version.
Keys are secret-shaped with an undocumented obfuscation layer
Secret types of secret keys used by OpenAI
OpenAI has no documentation about the internal or transformed non-employable API keys that they show in the usage on the Platform site or in line items or buckets in the usage. You are almost expected to see weird looking API keys, and then try to map them back yourself by making some calls and see what shows up in the billing. It needs foundational documentation of why they are showing a “key” you’ve never seen before.
This internal/repeatable ID should also be something reported on an API key in the platform site API key creation interface - to even make sense of stuff, to start with.
Name aliases: yes, useful
Naming API keys is a solid idea. It would probably have to be done at creation time in the platform site, and then there’s a bunch of API surfaces potentially consuming a name where this could not be refactored, but would have to be OpenAI placing a “data amender” to enhance database returns in API call responses.
Code: one place to put your key:name and have output amended.
I took your concern, (about 5x as much of my own specification-writing, and about 50x as much API documentation). Made it input to my coding coder AI.
Result: a proxy for RESTful OpenAI endpoints, supporting /v1/organization/* that would amend responses with API keys with a parallel key name - where you have to still intuit and provide in your own map, demonstrated as hard-coded. The console will report API key map matches or non-matches in usage data products it is repeating back and potentially amending.
fastapi>=0.111,<1
httpx[http2]>=0.27,<1
uvicorn[standard]>=0.30,<1
# OpenAI organization usage/cost API key name proxy
This is a minimal asyncio proxy for OpenAI organization usage and cost endpoints.
It forwards requests shaped like:
```text
/v1/organization/costs
/v1/organization/usage/completions
/v1/organization/usage/embeddings
/v1/organization/usage/images
/v1/organization/usage/audio_speeches
/v1/organization/usage/audio_transcriptions
/v1/organization/usage/moderations
/v1/organization/usage/vector_stores
/v1/organization/usage/code_interpreter_sessions
/v1/organization/usage/file_search_calls
/v1/organization/usage/web_search_calls
```
The proxy does not store or mint bearer tokens. It forwards the caller's
`Authorization` header to OpenAI and returns OpenAI's HTTP status code and error
shape when OpenAI responds with an error.
For JSON responses, every object containing `api_key_id` receives an additional
field:
```json
{
"api_key_id": "key_1234567890abcdef",
"api_key_name": "checkout-service-production"
}
```
If the `api_key_id` is absent from the proxy's demonstration map, the new field
is set to `"unknown"` and the console log records the unmatched value.
## Install
```bash
cd v1_organization_proxy
python -m venv .venv
. .venv/bin/activate
pip install -r requirements.txt
```
## Run
```bash
python app.py --host 127.0.0.1 --port 18080
```
Environment variables are also supported:
```bash
OPENAI_ORG_PROXY_HOST=0.0.0.0 \
OPENAI_ORG_PROXY_PORT=18080 \
OPENAI_ORG_PROXY_TIMEOUT_SECONDS=60 \
python app.py
```
## Call
The endpoint path mirrors OpenAI's organization endpoints:
```bash
curl "http://127.0.0.1:18080/v1/organization/costs?start_time=1730419200&limit=1&group_by=api_key_id" \
-H "Authorization: Bearer $OPENAI_ADMIN_KEY" \
-H "Content-Type: application/json"
```
The proxy also accepts `group_by=api_key_name` as a caller convenience and sends
`group_by=api_key_id` upstream, because OpenAI currently groups by key ID:
```bash
curl "http://127.0.0.1:18080/v1/organization/costs?start_time=1730419200&group_by=api_key_name" \
-H "Authorization: Bearer $OPENAI_ADMIN_KEY"
```
## Configure the internal map
Edit `API_KEY_ID_TO_NAME` in `app.py`:
```python
API_KEY_ID_TO_NAME = {
"key_1234567890abcdef": "checkout-service-production",
"key_abcdef1234567890": "mobile-app-production",
}
```
The identifiers are OpenAI organization usage/cost API key IDs, not secret key
values. The mapping block is intentionally small and hard-coded for reference
use. In a production deployment, replace it with a private configuration file,
database, or internal service.
## Use from OpenAI SDK clients
Configure the SDK base URL to the proxy's high port:
```python
from openai import OpenAI
client = OpenAI(
api_key="admin-key-here",
base_url="http://127.0.0.1:18080/v1",
)
```
Requests made by the SDK to `/organization/*` are forwarded to OpenAI and JSON
responses are amended with `api_key_name`.
import argparse
import asyncio
import json
import logging
import os
import sys
from collections.abc import AsyncIterator, Iterable
from contextlib import asynccontextmanager
from dataclasses import dataclass
from urllib.parse import urlencode
import httpx
import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, Response
type JsonValue = None | bool | int | float | str | list["JsonValue"] | dict[str, "JsonValue"]
OPENAI_BASE_URL = "https://api.openai.com"
PROXY_NAME_FIELD = "api_key_name"
UPSTREAM_API_KEY_GROUP = "api_key_id"
PROXY_API_KEY_GROUP = "api_key_name"
API_KEY_ID_FIELD = "api_key_id"
HOP_BY_HOP_HEADERS = {
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailer",
"transfer-encoding",
"upgrade",
}
REQUEST_HEADERS_TO_DROP = HOP_BY_HOP_HEADERS | {
"host",
"content-length",
}
RESPONSE_HEADERS_TO_DROP = HOP_BY_HOP_HEADERS | {
"content-encoding",
"content-length",
}
# Demonstration map. Replace with an external file, database, or internal API.
# The keys are OpenAI organization usage/cost API key identifiers, not secret
# API key values.
API_KEY_ID_TO_NAME = {
"key_1234567890abcdef": "checkout-service-production",
"key_abcdef1234567890": "mobile-app-production",
"key_1111222233334444": "internal-analytics",
}
@dataclass(frozen=True)
class ProxyConfig:
host: str
port: int
openai_base_url: str
request_timeout_seconds: float
@dataclass(frozen=True)
class RewriteStats:
recognized: int
named: int
unknown: int
class ApiKeyNameAppender:
def __init__(self, api_key_names: dict[str, str], logger: logging.Logger) -> None:
self._api_key_names = api_key_names
self._logger = logger
def append_names(self, payload: JsonValue, *, request_label: str) -> tuple[JsonValue, RewriteStats]:
counters = {"recognized": 0, "named": 0, "unknown": 0}
amended = self._append_names(payload, location="$", request_label=request_label, counters=counters)
stats = RewriteStats(
recognized=counters["recognized"],
named=counters["named"],
unknown=counters["unknown"],
)
return amended, stats
def _append_names(
self,
value: JsonValue,
*,
location: str,
request_label: str,
counters: dict[str, int],
) -> JsonValue:
if isinstance(value, list):
return [
self._append_names(item, location=f"{location}[{index}]", request_label=request_label, counters=counters)
for index, item in enumerate(value)
]
if isinstance(value, dict):
amended: dict[str, JsonValue] = {
key: self._append_names(
child,
location=f"{location}.{key}",
request_label=request_label,
counters=counters,
)
for key, child in value.items()
}
if API_KEY_ID_FIELD in value:
counters["recognized"] += 1
api_key_id = value[API_KEY_ID_FIELD]
api_key_name = self._api_key_names.get(api_key_id) if isinstance(api_key_id, str) else None
if api_key_name:
counters["named"] += 1
self._logger.info(
"%s: recognized %s at %s as %s",
request_label,
API_KEY_ID_FIELD,
location,
api_key_name,
)
else:
counters["unknown"] += 1
api_key_name = "unknown"
self._logger.info(
"%s: recognized %s at %s but no internal name matched value %r",
request_label,
API_KEY_ID_FIELD,
location,
api_key_id,
)
amended[PROXY_NAME_FIELD] = api_key_name
return amended
return value
def create_app(config: ProxyConfig, api_key_names: dict[str, str] | None = None) -> FastAPI:
logger = logging.getLogger("organization-usage-proxy")
appender = ApiKeyNameAppender(api_key_names or API_KEY_ID_TO_NAME, logger)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
app.state.http_client = httpx.AsyncClient(
base_url=config.openai_base_url,
follow_redirects=False,
http2=True,
timeout=httpx.Timeout(config.request_timeout_seconds),
)
logger.info(
"proxy listening on http://%s:%s and forwarding /v1/organization/* to %s",
config.host,
config.port,
config.openai_base_url,
)
try:
yield
finally:
client: httpx.AsyncClient = app.state.http_client
await client.aclose()
app = FastAPI(
title="OpenAI organization usage/cost API key name proxy",
version="0.1.0",
docs_url=None,
redoc_url=None,
lifespan=lifespan,
)
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "ok"}
@app.api_route(
"/v1/organization/{organization_path:path}",
methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "HEAD"],
)
async def proxy_organization_request(organization_path: str, request: Request) -> Response:
client: httpx.AsyncClient = app.state.http_client
request_label = f"{request.method} /v1/organization/{organization_path}"
upstream_path = f"/v1/organization/{organization_path}"
upstream_query = rewrite_group_by_query(request.query_params.multi_items())
upstream_url = upstream_path if not upstream_query else f"{upstream_path}?{upstream_query}"
try:
upstream_response = await client.request(
request.method,
upstream_url,
headers=forward_request_headers(request),
content=await request.body(),
)
except httpx.TimeoutException:
logger.warning("%s: upstream request timed out", request_label)
return openai_style_proxy_error(
status_code=504,
code="upstream_timeout",
message="The proxy timed out while waiting for OpenAI.",
)
except httpx.TransportError as error:
logger.warning("%s: upstream transport error: %s", request_label, error)
return openai_style_proxy_error(
status_code=502,
code="upstream_transport_error",
message="The proxy could not connect to OpenAI.",
)
logger.info("%s: OpenAI returned HTTP %s", request_label, upstream_response.status_code)
if not is_json_response(upstream_response):
return raw_upstream_response(upstream_response)
try:
upstream_payload = upstream_response.json()
except json.JSONDecodeError:
logger.warning("%s: OpenAI returned an invalid JSON body", request_label)
return raw_upstream_response(upstream_response)
amended_payload, stats = appender.append_names(upstream_payload, request_label=request_label)
if stats.recognized:
logger.info(
"%s: appended %s to %s item(s): %s named, %s unknown",
request_label,
PROXY_NAME_FIELD,
stats.recognized,
stats.named,
stats.unknown,
)
else:
logger.info("%s: no %s fields recognized in response", request_label, API_KEY_ID_FIELD)
return JSONResponse(
status_code=upstream_response.status_code,
content=amended_payload,
headers=forward_response_headers(upstream_response),
)
return app
def rewrite_group_by_query(query_items: Iterable[tuple[str, str]]) -> str:
rewritten: list[tuple[str, str]] = []
for key, value in query_items:
if key in {"group_by", "group_by[]"}:
rewritten.append((key, rewrite_group_by_value(value)))
else:
rewritten.append((key, value))
return urlencode(rewritten, doseq=True)
def rewrite_group_by_value(value: str) -> str:
values = value.split(",")
if len(values) == 1:
return UPSTREAM_API_KEY_GROUP if value == PROXY_API_KEY_GROUP else value
return ",".join(UPSTREAM_API_KEY_GROUP if item == PROXY_API_KEY_GROUP else item for item in values)
def forward_request_headers(request: Request) -> dict[str, str]:
return {
key: value
for key, value in request.headers.items()
if key.lower() not in REQUEST_HEADERS_TO_DROP
}
def forward_response_headers(response: httpx.Response) -> dict[str, str]:
return {
key: value
for key, value in response.headers.items()
if key.lower() not in RESPONSE_HEADERS_TO_DROP
}
def is_json_response(response: httpx.Response) -> bool:
content_type = response.headers.get("content-type", "")
return "application/json" in content_type.lower()
def raw_upstream_response(response: httpx.Response) -> Response:
return Response(
status_code=response.status_code,
content=response.content,
headers=forward_response_headers(response),
media_type=response.headers.get("content-type"),
)
def openai_style_proxy_error(*, status_code: int, code: str, message: str) -> JSONResponse:
return JSONResponse(
status_code=status_code,
content={
"error": {
"message": message,
"type": "proxy_error",
"param": None,
"code": code,
}
},
)
def load_config(argv: list[str] | None = None) -> ProxyConfig:
parser = argparse.ArgumentParser(
description="Proxy OpenAI /v1/organization/* calls and append API key names to JSON results.",
)
parser.add_argument("--host", default=os.getenv("OPENAI_ORG_PROXY_HOST", "127.0.0.1"))
parser.add_argument("--port", type=int, default=int(os.getenv("OPENAI_ORG_PROXY_PORT", "18080")))
parser.add_argument("--openai-base-url", default=os.getenv("OPENAI_BASE_URL", OPENAI_BASE_URL))
parser.add_argument(
"--timeout",
type=float,
default=float(os.getenv("OPENAI_ORG_PROXY_TIMEOUT_SECONDS", "60")),
help="Upstream request timeout in seconds.",
)
args = parser.parse_args(argv)
return ProxyConfig(
host=args.host,
port=args.port,
openai_base_url=args.openai_base_url.rstrip("/"),
request_timeout_seconds=args.timeout,
)
def configure_logging() -> None:
logging.basicConfig(
level=logging.INFO,
stream=sys.stdout,
format="%(asctime)s %(levelname)s %(name)s - %(message)s",
)
async def run_server(config: ProxyConfig) -> None:
server = uvicorn.Server(
uvicorn.Config(
create_app(config),
host=config.host,
port=config.port,
log_config=None,
access_log=False,
)
)
await server.serve()
def main(argv: list[str] | None = None) -> None:
configure_logging()
config = load_config(argv)
asyncio.run(run_server(config))
if __name__ == "__main__":
main()
Note: this key-namer proxy is for inside an organization’s network, on a high port number, as an edge exit. It is not something to expose on a public IP: it repeats calls without its own authentication method.
Note2: it does not wholly replace OpenAI’s obfuscated API key in returns, but that could be something you do to make the proxy work with existing admin code.
Note3: I invested under 10 minutes, you should invest much more.