m***@163.com
m***@163.com
  • 发布:2023-06-26 20:11
  • 更新:2023-06-26 20:37
  • 阅读:322

SSEChannel的问题,急需解决

分类:uniCloud

产品分类: uniCloud/App

示例代码:

客户端代码

// 初始化sse通道  
let sseChannel = false  
// 引入markdown-it库  
import MarkdownIt from '@/lib/markdown-it.min.js';  

// hljs是由 Highlight.js 经兼容性修改后的文件,请勿直接升级。否则会造成uni-app-vue3-Android下有兼容问题  
import hljs from "@/lib/highlight/highlight-uni.min.js";  

const md = new MarkdownIt({  
    breaks: true, // 设置为true,即可实现冒号、分号、句号换行的效果  
    highlight: function(str, lang) {  
        if (lang && hljs.getLanguage(lang)) {  
            try {  
                return '<pre class="hljs"><code class="hljs ' + lang + '">' +  
                    hljs.highlight(lang, str, true).value +  
                    '</code></pre>';  
            } catch (__) {}  
        }  

        return '<pre class="hljs"><code>' + md.utils.escapeHtml(str) + '</code></pre>';  
    },  
    html: true  
})  

export default {  
    data() {  
        return {  
            // 记录流式响应次数  
            sseIndex: 0,  
            lastMsg: {  
                content: '',  
                markdownHtml: null,  
                reply: ''  
            },  
            status:'more'  
        }  
    },  
    methods: {  
        async setGPT(msg) {  
            sseChannel = new uniCloud.SSEChannel() // 创建消息通道  
            sseChannel.on('message', message => {  
                // 监听message事件  
                this.lastMsg.content += message  
                let html = md.render(this.lastMsg.content)  
                this.lastMsg.markdownHtml = html  
                // console.log(this.lastMsg.markdownHtml);  
                // 让流式响应计数值递增  
                this.sseIndex++  
            })  
            sseChannel.on('end', message => {  
                // 监听end事件,如果云端执行end时传了message,会在客户端end事件内收到传递的消息  
                console.log('on end', message)  
                this.status = 'no-more'  
                //执行SSE关闭事件  
                this.close()  
            })  
            await sseChannel.open() // 等待通道开启  
            this.status = 'loading'  
            const chatgpt = uniCloud.importObject('hszy-chatgpt', {  
                customUI: true  
            })  
            chatgpt  
                .send({  
                    messages: msg,  
                    sseChannel  
                }).then(result => {  
                    if (!sseChannel) {  
                        if (!result.data) {  
                            return  
                        }  
                        console.log(result, result.reply)  
                    }  
                })  
        },  
        close() {  
            if (sseChannel) {  
                sseChannel.close()  
                this.sseIndex = 0  
                sseChannel = false  
            }  
            this.status = 'more'  
        }  
    }  
}

云对象代码

// 云对象教程: https://uniapp.dcloud.net.cn/uniCloud/cloud-obj  
// jsdoc语法提示教程:https://ask.dcloud.net.cn/docs/#//ask.dcloud.net.cn/article/129  
// 引入utils模块中的safeRequire和checkContentSecurityEnable函数  
const {  
    safeRequire  
} = require('./utils')  
// 引入uni-config-center模块,并创建config对象  
const createConfig = safeRequire('uni-config-center')  
const config = createConfig({  
    pluginId: 'uni-ai-chat'  
}).config()  
const {  
    Configuration,  
    OpenAIApi  
} = require("openai");  
const {  
    parse  
} = require('path');  
// 设置密钥  
const configuration = new Configuration({  
    apiKey: 'sk-XXXXXXXXXXXXXXXXXXXX',  
    basePath: "https://XXXXXXXXX/openai/v1",  
});  
const openai = new OpenAIApi(configuration);  

function countOccurrences(bigString, smallString) {  
    const regex = new RegExp(smallString, 'g');  
    const matches = bigString.match(regex);  
    return matches ? matches.length : 0;  
}  

module.exports = {  
    _before: async function() {  
        // 这里是云函数的前置方法,你可以在这里加入你需要逻辑  

        // 判断否调用量本云对象的send方法  
        if (this.getMethodName() == 'send') {  
            // 从配置中心获取内容安全配置  
            // console.log('config.contentSecurity',config.contentSecurity);  
            if (config.contentSecurity) {  
                // 引入uni-sec-check模块  
                const UniSecCheck = safeRequire('uni-sec-check')  
                // 创建uniSecCheck对象  
                const uniSecCheck = new UniSecCheck({  
                    provider: 'mp-weixin',  
                    requestId: this.getUniCloudRequestId()  
                })  
                // 定义文本安全检测函数  
                this.textSecCheck = async (content) => {  
                    // 获取sseChannel  
                    let {  
                        sseChannel  
                    } = this.getParams()[0] || {}  
                    // 如果存在sseChannel,则抛出错误  
                    if (sseChannel) {  
                        throw {  
                            errSubject: 'uni-ai-chat',  
                            errCode: "sec-check",  
                            errMsg: "流式响应模式,内容安全识别功能无效"  
                        }  
                    }  
                    // 检测文本  
                    const checkRes = await uniSecCheck.textSecCheck({  
                        // 文本内容,不可超过500KB  
                        content,  
                        // 微信小程序端 开放的唯一用户标识符  
                        // openid,  
                        // 场景值(1 资料;2 评论;3 论坛;4 社交日志)  
                        scene: 1,  
                        // 接口版本号,可选1或2,但1的检测能力很弱  支持微信登录的项目,微信小程序端 可改用模式2 详情:https://uniapp.dcloud.net.cn/uniCloud/uni-sec-check.html#%E4%BD%BF%E7%94%A8%E5%89%8D%E5%BF%85%E7%9C%8B  
                        version: 2  
                    })  
                    console.log('checkRes检测文本', checkRes);  
                    // 如果检测到风险内容,则抛出错误  
                    if (checkRes.errCode === uniSecCheck.ErrorCode.RISK_CONTENT) {  
                        console.error({  
                            errCode: checkRes.errCode,  
                            errMsg: '文字存在风险',  
                            result: checkRes.result  
                        });  
                        throw "uni-sec-check:illegalData"  
                        // 如果检测出错,则抛出错误  
                    } else if (checkRes.errCode) {  
                        console.log(  
                            `其他原因导致此文件未完成自动审核(错误码:${checkRes.errCode},错误信息:${checkRes.errMsg}),需要人工审核`  
                        );  
                        console.error({  
                            errCode: checkRes.errCode,  
                            errMsg: checkRes.errMsg,  
                            result: checkRes.result  
                        });  
                        throw "uni-sec-check:illegalData"  
                    }  
                }  

                // 获取messages参数  
                let {  
                    messages  
                } = this.getParams()[0] || {  
                    "messages": []  
                }  
                // 将messages中的content拼接成字符串  
                let contentString = messages.map(i => i.content).join(' ')  
                console.log('contentString', contentString);  
                // 对contentString进行文本安全检测  
                await this.textSecCheck(contentString)  
            }  
        }  
    },  
    async _after(error, result) {  
        // 打印错误和结果  
        // console.log('_after',{error,result});   
        // 如果有错误  
        if (error) {  
            // 如果是内容安全检测错误  
            if (error.errCode == 60004 || error == "uni-sec-check:illegalData") {  
                // 返回一个包含敏感内容提示和标记的响应体  
                return {  
                    "data": {  
                        "reply": "内容涉及敏感",  
                        "illegal": true  
                    },  
                    "errCode": 0  
                }  
            }  
            // 其他符合响应体规范的错误,直接返回  
            else if (error.errCode && error.errMsg) {  
                return error  
            } else {  
                // 如果是其他错误  
                throw error // 直接抛出异常  
            }  
        }  

        // 如果是send方法且开启了内容安全检测  
        if (this.getMethodName() == 'send' && config.contentSecurity) {  
            try {  
                // 对回复内容进行文本安全检测  
                await this.textSecCheck(result.data.reply)  
            } catch (e) {  
                // 如果检测到敏感内容 返回一个包含敏感内容提示和标记的响应体  
                return {  
                    "data": {  
                        "reply": "内容涉及敏感",  
                        "illegal": true  
                    },  
                    "errCode": 0  
                }  
            }  
        }  
        // 返回处理后的结果  
        return result  
    },  

    // 发送消息  
    async send({  
        // 消息内容  
        messages,  
        // sse渠道对象  
        sseChannel  
    }) {  
        // 校验客户端提交的参数  
        // 检查消息是否符合规范  
        let res = checkMessages(messages)  
        if (res.errCode) {  
            throw new Error(res.errMsg)  
        }  

        try {  
            let reply = ""  
            return new Promise(async (resolve, reject) => {  
                // 反序列化sseChannel  
                const channel = uniCloud.deserializeSSEChannel(sseChannel)  

                const res = await openai.createChatCompletion({  
                    model: "gpt-3.5-turbo-16k",  
                    messages,  
                    presence_penalty: 0,  
                    stream: true,  
                    temperature: 0.5,  
                    frequency_penalty: 0  
                }, {  
                    responseType: "stream"  
                });  
                res.data.on("data", async (data) => {  
                    try {  
                        let message = data.toString().trim();  
                        message = message.replace('data: [DONE]', '');  
                        message = message.replace(/data:/g, ",data:");  

                        const occurrences = countOccurrences(message, 'data:');  

                        if (occurrences != 1) {  
                            let parts = message.split(",data:");  
                            let result = parts.map(part => {  
                                if (part.trim() !== "") {  
                                    return JSON.parse(part.trim());  
                                }  
                            }).filter(Boolean);  

                            for await (let item of result) {  
                                if (item.choices[0].finish_reason == "stop") {  
                                    await channel.end();  
                                    resolve({  
                                        errCode: 0,  
                                        content: reply,  
                                        role: "assistant"  
                                    });  
                                } else {  
                                    const token = item.choices[0].delta.content;  
                                    reply += token;  
                                    console.log(token);  
                                    await channel.write(token);  
                                }  
                            }  

                        } else {  
                            message = message.replace(/^,data:/, '');  
                            const parsed = JSON.parse(message);  
                            if (parsed.choices[0].finish_reason == "stop") {  
                                await channel.end();  
                                resolve({  
                                    errCode: 0,  
                                    "content": reply,  
                                    "role": "assistant"  
                                });  
                            } else {  
                                const token = parsed.choices[0].delta.content;  
                                reply += token;  
                                console.log(token);  
                                await channel.write(token);  
                            }  
                        }  
                    } catch (e) {  
                        reject(e);  
                    }  
                });  

                // 返回错误  
                res.data.on('error', async (error) => {  
                    console.error('---error----', error)  
                    reject(error)  
                })  
            })  

        } catch (e) {  
            console.log(e);  
            //TODO handle the exception  
            return {  
                errCode: 500  
            }  
        }  

        /**  
         * 校验消息内容是否符合规范  
         * @param {Array} messages - 消息列表  
         * @returns {Object} - 返回校验结果  
         */  
        function checkMessages(messages) {  
            try {  
                // 如果messages未定义  
                if (messages === undefined) {  
                    // 抛出异常  
                    throw "messages为必传参数"  
                    // 如果messages不是数组  
                } else if (!Array.isArray(messages)) {  
                    // 抛出异常  
                    throw "参数messages的值类型必须是[object,object...]"  
                } else {  
                    // 否则 遍历messages  
                    messages.forEach(item => {  
                        // 如果item不是对象  
                        if (typeof item != 'object') {  
                            // 抛出异常  
                            throw "参数messages的值类型必须是[object,object...]"  
                        }  
                        // 定义itemRoleArr数组  
                        let itemRoleArr = ["assistant", "user", "system"]  
                        // 如果item的role属性不在itemRoleArr数组中  
                        if (!itemRoleArr.includes(item.role)) {  
                            // 抛出异常  
                            throw "参数messages[{role}]的值只能是:" + itemRoleArr.join('或')  
                        }  
                        // 如果item的content属性不是字符串  
                        if (typeof item.content != 'string') {  
                            // 抛出异常  
                            throw "参数messages[{content}]的值类型必须是字符串"  
                        }  
                    })  
                }  
                // 返回校验结果  
                return {  
                    errCode: 0,  
                }  
                // 捕获异常  
            } catch (errMsg) {  
                // 返回异常信息  
                return {  
                    errSubject: 'ai-demo',  
                    errCode: 'param-error',  
                    errMsg  
                }  
            }  
        }  
    }  
}

操作步骤:

客户端、服务端代码已粘贴,业务场景是写文章,保证响应的数据足够大,响应时间能超过60s

预期结果:

实际结果:

bug描述:

1、在本地云函数,请求openai,响应流测试没有问题,但当使用SSEChannel向客户端推送数据,当响应大约63秒64秒左右的时间,响应数据就会中断
2、有采用uniapp官方的uni-ai-chat,也是存在这个问题,于是手写了下直接请求openai的方式,通过打印,发现问题出现在推送通道上,云函数超过一分钟,推送通道是可以保持继续执行的,但继续推送几秒钟后,直接就休止了(采用的model是“gpt-3.5-turbo-16k”),即不执行err的监听,也不执行end的监听。

2023-06-26 20:11 负责人:无 分享
已邀请:
DCloud_uniCloud_JSON

DCloud_uniCloud_JSON

云端云函数超时时间是60秒。你说的63秒64秒,应该是在云函数超时之前,发出的所有网络请求,在那个时间响应结束。

如果你有超过60秒的需求,应该在客户端再发起一次请求,原理类似:判断是超时结束的就自动让客户端再发个“继续”

  • m***@163.com (作者)

    是的,我们的业务场景是用chatgpt写文章,超出60s很正常,那这样的话,用nodejs写个OpenAI后端服务会不会更好

    2023-06-27 09:13

  • DCloud_uniCloud_JSON

    回复 m***@163.com: uni-ai计费网关可以超过60秒

    2023-06-27 11:59

要回复问题请先登录注册