Skip to content

ChatBot 答疑/对话 API 文档

1. API 概述

该API基于OpenAI官方SDK(LLM通用API)封装, 提供智能问答和对话服务, 支持上下文对话和知识库检索。

1.1 基本信息

  • 基础SDK: OpenAI Python/Node.js SDK
  • 请求方式: POST
  • 数据格式: JSON
  • 响应类型: Stream/Regular
  • 字符编码: UTF-8

1.2 官方资源

2. 环境配置

2.1 Python环境

bash
pip install openai

2.2 Node.js环境

bash
npm install openai

3. 请求格式

3.1 单次问答请求

python
from openai import OpenAI

client = OpenAI(
    api_key="your-api-key",
    base_url="https://your-domain/v1"  # 可选,使用自定义endpoint
)

response = client.chat.completions.create(
    model="gpt-4-turbo",
    messages=[
        {"role": "user", "content": "问题内容"}
    ],
    temperature=0.7,
    stream=True
)

3.2 对话请求

python
response = client.chat.completions.create(
    model="gpt-4-turbo",
    messages=[
        {"role": "system", "content": "你是一个IELTS写作助手"},
        {"role": "user", "content": "第一个问题"},
        {"role": "assistant", "content": "第一个回答"},
        {"role": "user", "content": "第二个问题"}
    ],
    temperature=0.7,
    stream=True
)

3.3 参数说明

参数类型必填说明
modelstring模型选择
messagesarray对话历史记录
temperaturefloat温度参数(0-2),默认0.7
streamboolean是否流式响应,默认false
max_tokensinteger最大token数
presence_penaltyfloat重复惩罚度(-2.0到2.0)
frequency_penaltyfloat频率惩罚度(-2.0到2.0)

4. 代码示例

4.1 Python示例 - 流式响应

python
from openai import OpenAI
from typing import List, Dict

class ChatBot:
    def __init__(self, api_key: str, base_url: str = None):
        self.client = OpenAI(
            api_key=api_key,
            base_url=base_url
        )
        
    def chat_stream(self, messages: List[Dict[str, str]]) -> str:
        """流式对话"""
        try:
            response = self.client.chat.completions.create(
                model="gpt-4-turbo",
                messages=messages,
                stream=True
            )
            
            # 处理流式响应
            for chunk in response:
                if chunk.choices[0].delta.content:
                    print(chunk.choices[0].delta.content, end="", flush=True)
                    
        except Exception as e:
            print(f"Error: {str(e)}")
            
    def chat_complete(self, messages: List[Dict[str, str]]) -> str:
        """完整对话"""
        try:
            response = self.client.chat.completions.create(
                model="gpt-4-turbo",
                messages=messages,
                stream=False
            )
            return response.choices[0].message.content
            
        except Exception as e:
            print(f"Error: {str(e)}")
            return None

# 使用示例
if __name__ == "__main__":
    bot = ChatBot(api_key="your-api-key")
    
    # 示例对话
    messages = [
        {"role": "system", "content": "你是一个IELTS写作助手"},
        {"role": "user", "content": "请分析这篇作文的语法错误"}
    ]
    
    # 流式对话
    print("流式响应:")
    bot.chat_stream(messages)
    
    # 完整对话
    print("\n\n完整响应:")
    response = bot.chat_complete(messages)
    print(response)

4.2 Node.js示例

javascript
import OpenAI from 'openai';

class ChatBot {
    constructor(apiKey, baseURL = null) {
        this.client = new OpenAI({
            apiKey: apiKey,
            baseURL: baseURL
        });
    }
    
    async chatStream(messages) {
        try {
            const stream = await this.client.chat.completions.create({
                model: "gpt-4-turbo",
                messages: messages,
                stream: true,
            });
            
            for await (const chunk of stream) {
                const content = chunk.choices[0]?.delta?.content || '';
                if (content) {
                    process.stdout.write(content);
                }
            }
        } catch (error) {
            console.error('Error:', error);
        }
    }
    
    async chatComplete(messages) {
        try {
            const response = await this.client.chat.completions.create({
                model: "gpt-4-turbo",
                messages: messages,
                stream: false,
            });
            return response.choices[0].message.content;
        } catch (error) {
            console.error('Error:', error);
            return null;
        }
    }
}

// 使用示例
async function main() {
    const bot = new ChatBot('your-api-key');
    
    const messages = [
        { role: "system", content: "你是一个IELTS写作助手" },
        { role: "user", content: "请分析这篇作文的语法错误" }
    ];
    
    console.log('流式响应:');
    await bot.chatStream(messages);
    
    console.log('\n\n完整响应:');
    const response = await bot.chatComplete(messages);
    console.log(response);
}

main();

5. 错误处理

5.1 常见错误码

错误码说明处理建议
401认证失败检查API密钥
429请求过于频繁实现请求限制和重试机制
500服务器错误实现错误重试机制
503服务不可用实现服务降级策略

5.2 错误处理示例

python
from openai import OpenAI
from openai import APIError, APIConnectionError, RateLimitError
import time

def retry_with_exponential_backoff(
    func,
    initial_delay: float = 1,
    exponential_base: float = 2,
    max_retries: int = 3
):
    """指数退避重试装饰器"""
    def wrapper(*args, **kwargs):
        delay = initial_delay
        
        for i in range(max_retries):
            try:
                return func(*args, **kwargs)
                
            except RateLimitError as e:
                if i == max_retries - 1:
                    raise
                    
                time.sleep(delay)
                delay *= exponential_base
                
            except (APIError, APIConnectionError) as e:
                if i == max_retries - 1:
                    raise
                    
                time.sleep(delay)
                delay *= exponential_base
                
    return wrapper

@retry_with_exponential_backoff
def chat_with_retry(client, messages):
    return client.chat.completions.create(
        model="gpt-4-turbo",
        messages=messages
    )

6. 最佳实践

6.1 性能优化

  1. 使用流式响应提升用户体验
  2. 实现消息历史管理,控制上下文长度
  3. 合理设置temperature和max_tokens
  4. 实现结果缓存机制

6.2 成本控制

  1. 监控token使用量
  2. 实现token预估功能
  3. 设置合理的max_tokens限制
  4. 定期清理无用的上下文历史

6.3 安全建议

  1. 不要在客户端暴露API密钥
  2. 实现请求频率限制
  3. 对用户输入进行过滤和验证
  4. 实现访问日志和监控

7. 使用限制

  1. 请求频率限制: 遵循OpenAI官方限制
  2. 上下文最大长度: 根据模型不同有所区别
  3. 流式响应超时: 建议设置为60秒
  4. 非流式响应超时: 建议设置为30秒

8. 常见问题

8.1 如何管理对话历史?

python
class ConversationManager:
    def __init__(self, max_history: int = 10):
        self.messages = []
        self.max_history = max_history
        
    def add_message(self, role: str, content: str):
        self.messages.append({"role": role, "content": content})
        # 保持历史记录在限制范围内
        if len(self.messages) > self.max_history:
            # 保留system message
            system_messages = [m for m in self.messages if m["role"] == "system"]
            other_messages = [m for m in self.messages if m["role"] != "system"]
            # 只保留最近的消息
            recent_messages = other_messages[-self.max_history:]
            self.messages = system_messages + recent_messages
            
    def get_messages(self):
        return self.messages
        
    def clear_history(self):
        # 保留system message
        self.messages = [m for m in self.messages if m["role"] == "system"]

8.2 如何估算token数量?

python
import tiktoken

def num_tokens_from_messages(messages, model="gpt-4-turbo"):
    encoding = tiktoken.encoding_for_model(model)
    num_tokens = 0
    for message in messages:
        num_tokens += 4  # 每条消息的基础token
        for key, value in message.items():
            num_tokens += len(encoding.encode(value))
            if key == "name":  # 如果有name字段
                num_tokens += -1  # role和name共用token
    num_tokens += 2  # 对话的结束token
    return num_tokens

8.3 如何实现重试机制?

参考上面的 retry_with_exponential_backoff 装饰器实现。

8.4 如何处理流式响应?

参考上面的 chat_stream 方法实现。