cnllm-skill
NewA Python library for OpenAI-compatible Chinese LLM APIs. Use this when users need per-request batch configuration, automated streaming accumulation, streaming structural overview, vendor-native parameter validation with feedback, memory control, configurable failure policy, and batch task progress tracking. Replaces OpenAI SDK/LiteLLM for multi-model workflows.
Overview
CNLLM: Chinese LLM Unified Adapter
When to Use This Skill
- •Multi‑model workflows – call different Chinese LLMs (DeepSeek, GLM, Qwen, etc.) in multi-model workflows.
- •Per‑request batch – configure distinct parameters (
model,thinking,tools,stream) for each request in a batch. - •Automated streaming accumulation – real‑time access to
reasoning_content,content,tool_callsvia properties (think/.still/.tools). - •Streaming structural overview – inspect streaming process via a structured view (
.repr). - •Parameter validation – explicit feedback on unsupported parameters with configurable handling (
drop_params). - •Memory control – manage memory usage with
keep. - •Batch progress tracking – monitor batch task progress via
.status.
Vendor Support
DeepSeek, GLM (Zhipu), KIMI (Moonshot), MiniMax, Doubao (ByteDance), Xiaomi mimo, Qwen (Alibaba), Ernie (Baidu), Hunyuan (Tencent). (Model list see docs/model_list.md)
Installation & Version
pip install cnllmFor latest features, ensure version >=0.9.3post3.
Basic Principles
1. Initialize the client
client = CNLLM(model="deepseek-chat", api_key=os.getenv("DEEPSEEK_KEY")) # use default base_url - vendor's OpenAI-compatible url
resp = client.chat.create(prompt="Hello, world!", model="deepseek-v4-pro")Note: Initialize the client before invocation. Parameters at call will override the same parameters set at client.
2. Unified Parameters
Use OpenAI standard parameters, configure vendor-native ones if needed, which will be passed as-is if supported (do not need extra_body ).
3. HTTP Control Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
timeout | int | 60 | Request timeout |
max_retries | int | 3 | Maximum retry count |
retry_delay | float | 1.0 | Retry delay |
max_concurrent | int | 3(chat)/12(embeddings) | Maximum concurrent requests, batch only |
rps | int | 2(chat)/10(embeddings) | Maximum requests per second, batch only |
Chat Examples
1. Single Chat with Streaming Incremental Access and Live View
import os
from cnllm import CNLLM, ToolCollector
client = CNLLM(model="deepseek-chat", api_key=os.getenv("DEEPSEEK_KEY"))
resp = client.chat.create(messages=[{"role": "user", "content": "Hello"}],
stream=True, thinking=True, tools=[...])
# ── Streaming: iterate ALL chunks to consume the stream ──
# chunk.* returns per-frame delta (mandatory)
# with resp as view: terminal live dict (optional)
frontend_content = []
frontend_reasoning = []
frontend_tools = ToolCollector() # auto-merge tool_calls by index
with resp as view:
for chunk in resp:
frontend_content.append(chunk.still)
frontend_reasoning.append(chunk.think)
frontend_tools.update(chunk.tools) # List[Dict] per-frame delta
view.refresh()
# ── After stream: resp.* returns fully accumulated results ──
print(resp.still) # fully accumulated content
print(resp.think) # fully accumulated reasoning
print(resp.tools) # List[Dict], OpenAI standard format, no index
print(resp) # final merged dict
# ── Build next-turn conversation context ──
from cnllm import ContextBox
messages += ContextBox(resp.still, resp.think, resp.tools,
executor=execute_tool) # define your own tool executor function
# → assistant message + tool_calls attached + tool results appended2. Streaming Batch with Per‑Request Incremental Access and Live View
resp = client.chat.batch(
prompt=["Weather in Beijing", "Weather in Shanghai"],
stream=True,
)
with resp as view: # terminal live view: {status + usage}
for chunk in resp:
rid = chunk["request_id"]
frontend[rid].append(chunk.still) # route delta to per-request panel
view.refresh()
print(resp.still) # {"request_0": "...", "request_1": "..."}
print(resp.tools) # {"request_0": [{tool_call}], "request_1": []} List[Dict] per request3. Mixed Batch (stream + non-stream) with Live View
resp = client.chat.batch(requests=[
{"prompt": "Weather", "stream": True, "tools": [weather_tool]},
{"prompt": "1+1=?"},
{"prompt": "Hello", "stream": True},
])
with resp as view:
for chunk in resp:
frontend[chunk["request_id"]].append(chunk.still)
view.refresh()
print(resp.still) # content of all requestsNote: Streaming and mixed batch must be iterated to trigger request processing, resp.still/resp.think/resp.tools complete after iteration.
4. Parameter Validation (drop_params) and Memory Control (keep)
resp = client.chat.batch(
prompt=["A","B"],
drop_params="strict",
keep=["results"],
)Note:
- •set
drop_params="strict"to reject unknown parameter (request fail and continue batch), or usewarnto warn and continue request, orignoreto silently drop.
`keep` defaults :
| Call Scenario | Default keep | Release |
|---|---|---|
client.chat.batch() | still/think/tools/status/usage | results/errors/raw |
client.embeddings.batch() | vectors/status/usage/batch_info | results/errors |
During iteration, all fields are available, independent of keep.
Batch Response Structure
chat.batch():
{
"status": {"elapsed": "3.42s", "success_count": 2, "fail_count": 1, "total": 3}, # Statistics
"usage": {"prompt_tokens": 5, "total_tokens": 5}, # Batch processing total usage info
"errors": {"request_2": "error message"}, # Mapping of all failed requests' request_id to error messages
"results": {"request_0": {...}, "request_1": {...}}, # Mapping of all successful requests' request_id to standard responses
"think": {"request_0": "...", "request_1": "..."},
"still": {"request_0": "...", "request_1": "..."},
"tools": {"request_0": [{tool_call}], "request_1": [{tool_call}]}, # List[Dict] per request
"raw": {"request_0": {...}, "request_1": {...}}
}embeddings.batch():
{
"status": {"elapsed": "3.35s", "success_count": 1, "fail_count": 1, "total": 2},
"batch_info": {"batch_size": 2, "batch_count": 2, "dimension": 1024},
"usage": {"prompt_tokens": 5, "total_tokens": 5},
"results": {"request_0": {...}, "request_1": {...}}
"errors": {"request_2": "error message"},
"vectors": {"request_0": [...], "request_1": [...]} # Mapping of all successful requests' request_id to embedding vectors
}Highlights
| Feature | Description | Import |
|---|---|---|
chunk.still / chunk.think / chunk.tools | Per-frame delta during streaming | Built-in on StreamChunk |
ToolCollector | Auto-merge tool_calls by index | from cnllm import ToolCollector |
ContextBox | Build conversation context from resp.* | from cnllm import ContextBox |
with resp as view: | Terminal live dict during streaming | Built-in on all responses |
Examples (See examples/ directory)
- •
streaming_incremental.py– single streaming withchunk.*+resp as view - •
batch_streaming.py– batch streaming withrequest_idrouting - •
mixed_batch.py– mixed (stream+non-stream) batch - •
embeddings.py– single/batch embedding requests - •
fallback.py–fallback_modelswith detailed error handling - •
async_client.py– async client usage withasyncCNLLM - •
langchain_integration.py– LangChain Runnable integration - •
batch_customization.py–custom_idsandcallbacks - •
tool_chain.py– multi-turn tool calling withContextBox
References (See references/ directory)
model_list.md - supported models common_mistakes.md - common mistakes
Install & Usage
mkdir -p .claude/agentsAdd the configuration to .claude/agents/cnllm-skill.md
@cnllm-skillSecurity Audits
Frequently Asked Questions
What is cnllm-skill?
A Python library for OpenAI-compatible Chinese LLM APIs. Use this when users need per-request batch configuration, automated streaming accumulation, streaming structural overview, vendor-native parameter validation with feedback, memory control, configurable failure policy, and batch task progress tracking. Replaces OpenAI SDK/LiteLLM for multi-model workflows.
How to install cnllm-skill?
To install cnllm-skill: create the agents directory (mkdir -p .claude/agents), then add the config to .claude/agents/cnllm-skill.md. Finally, @cnllm-skill in Claude Code.
What is cnllm-skill best for?
cnllm-skill is a agent categorized under General. It is designed for: api, python. Created by kanchengw.