From 1a86c3a485f1bb1935d9d106c16a41e7baab3646 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CQu?= Date: Sun, 19 Jan 2025 20:50:25 +0800 Subject: [PATCH] Fix Qwen.js to be compatible with OpenAI and add random backoff for rate limiting --- profiles/qwen.json | 4 +- src/agent/library/skill_library.js | 10 +- src/models/qwen.js | 177 ++++++++++------------------- 3 files changed, 70 insertions(+), 121 deletions(-) diff --git a/profiles/qwen.json b/profiles/qwen.json index 96f4757..f6a3f46 100644 --- a/profiles/qwen.json +++ b/profiles/qwen.json @@ -5,13 +5,13 @@ "model": { "api": "qwen", - "url": "https://dashscope-intl.aliyuncs.com/api/v1/services/aigc/text-generation/generation", + "url": "https://dashscope-intl.aliyuncs.com/compatible-mode/v1", "model": "qwen-max" }, "embedding": { "api": "qwen", - "url": "https://dashscope-intl.aliyuncs.com/api/v1/services/embeddings/text-embedding/text-embedding", + "url": "https://dashscope-intl.aliyuncs.com/compatible-mode/v1", "model": "text-embedding-v3" } } \ No newline at end of file diff --git a/src/agent/library/skill_library.js b/src/agent/library/skill_library.js index 356c614..a019112 100644 --- a/src/agent/library/skill_library.js +++ b/src/agent/library/skill_library.js @@ -8,12 +8,14 @@ export class SkillLibrary { this.skill_docs_embeddings = {}; } async initSkillLibrary() { - await Promise.all([ - ...getSkillDocs().map(async (doc) => { + const skillDocs = getSkillDocs(); + const embeddingPromises = skillDocs.map((doc) => { + return (async () => { let func_name_desc = doc.split('\n').slice(0, 2).join(''); this.skill_docs_embeddings[doc] = await this.embedding_model.embed(func_name_desc); - }) - ]); + })(); + }); + await Promise.all(embeddingPromises); } async getRelevantSkillDocs(message, select_num) { diff --git a/src/models/qwen.js b/src/models/qwen.js index d546298..20d5b49 100644 --- a/src/models/qwen.js +++ b/src/models/qwen.js @@ -1,134 +1,81 @@ -// This code uses Dashscope and HTTP to ensure the latest support for the Qwen model. -// Qwen is also compatible with the OpenAI API format; - -import { getKey } from '../utils/keys.js'; +import OpenAIApi from 'openai'; +import { getKey, hasKey } from '../utils/keys.js'; +import { strictFormat } from '../utils/text.js'; export class Qwen { - constructor(modelName, url) { - this.modelName = modelName; - this.url = url || 'https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation'; - this.apiKey = getKey('QWEN_API_KEY'); + constructor(model_name, url) { + this.model_name = model_name; + + let config = {}; + + config.baseURL = url || 'https://dashscope.aliyuncs.com/compatible-mode/v1'; + config.apiKey = getKey('QWEN_API_KEY'); + + this.openai = new OpenAIApi(config); } - async sendRequest(turns, systemMessage, stopSeq = '***', retryCount = 0) { - if (retryCount > 5) { - console.error('Maximum retry attempts reached.'); - return 'Error: Too many retry attempts.'; - } + async sendRequest(turns, systemMessage, stop_seq='***') { + let messages = [{'role': 'system', 'content': systemMessage}].concat(turns); - const data = { - model: this.modelName || 'qwen-plus', - input: { messages: [{ role: 'system', content: systemMessage }, ...turns] }, - parameters: { result_format: 'message', stop: stopSeq }, + messages = strictFormat(messages); + + const pack = { + model: this.model_name || "qwen-plus", + messages, + stop: stop_seq, }; - // Add default user message if all messages are 'system' role - if (turns.every((msg) => msg.role === 'system')) { - data.input.messages.push({ role: 'user', content: 'hello' }); - } - - if (!data.model || !data.input || !data.input.messages || !data.parameters) { - console.error('Invalid request data format:', data); - throw new Error('Invalid request data format.'); - } - + let res = null; try { - const response = await this._makeHttpRequest(this.url, data); - const choice = response?.output?.choices?.[0]; - - if (choice?.finish_reason === 'length' && turns.length > 0) { - return this.sendRequest(turns.slice(1), systemMessage, stopSeq, retryCount + 1); - } - - return choice?.message?.content || 'No content received.'; - } catch (err) { - console.error('Error occurred:', err); - return 'An error occurred, please try again.'; + console.log('Awaiting Qwen api response...') + // console.log('Messages:', messages); + let completion = await this.openai.chat.completions.create(pack); + if (completion.choices[0].finish_reason == 'length') + throw new Error('Context length exceeded'); + console.log('Received.') + res = completion.choices[0].message.content; } + catch (err) { + if ((err.message == 'Context length exceeded' || err.code == 'context_length_exceeded') && turns.length > 1) { + console.log('Context length exceeded, trying again with shorter context.'); + return await this.sendRequest(turns.slice(1), systemMessage, stop_seq); + } else { + console.log(err); + res = 'My brain disconnected, try again.'; + } + } + return res; } + // Why random backoff? + // With a 30 requests/second limit on Alibaba Qwen's embedding service, + // random backoff helps maximize bandwidth utilization. async embed(text) { - if (!text || typeof text !== 'string') { - console.error('Invalid embedding input: text must be a non-empty string:', text); - return 'Invalid embedding input: text must be a non-empty string.'; - } - - const data = { - model: this.modelName, - input: { texts: [text] }, - parameters: { text_type: 'query' }, - }; - - if (!data.model || !data.input || !data.input.texts || !data.parameters) { - console.error('Invalid embedding request data format:', data); - throw new Error('Invalid embedding request data format.'); - } - - try { - const response = await this._makeHttpRequest(this.url, data); - const embedding = response?.output?.embeddings?.[0]?.embedding; - - return embedding || 'No embedding result received.'; - } catch (err) { - console.log('Embed data:', data); - console.error('Embed error occurred:', err); - return 'An error occurred, please try again.'; - } - } - - async _makeHttpRequest(url, data, maxRetries = 10) { - const headers = { - 'Authorization': `Bearer ${this.apiKey}`, - 'Content-Type': 'application/json', - }; - - let retryCount = 0; - - while (retryCount < maxRetries) { + const maxRetries = 5; // Maximum number of retries + for (let retries = 0; retries < maxRetries; retries++) { try { - const response = await fetch(url, { - method: 'POST', - headers, - body: JSON.stringify(data), + const { data } = await this.openai.embeddings.create({ + model: this.model_name || "text-embedding-v3", + input: text, + encoding_format: "float", }); - - if (response.ok) { - const responseText = await response.text(); - try { - //Task completed successfully - return JSON.parse(responseText); - } catch (err) { - console.error('Failed to parse response JSON:', err); - throw new Error('Invalid response JSON format.'); - } - } else { - const errorText = await response.text(); - - if (response.status === 429 || response.statusText.includes('Too Many Requests')) { - // Handle rate limiting - retryCount++; - if (retryCount >= maxRetries) { - console.error('Exceeded maximum retry attempts, unable to get request result.'); - throw new Error(`Request failed after ${maxRetries} retries due to rate limiting.`); - } - //Reached Qwen concurrency limit, waiting in queue - const waitTime = Math.random() * 1000; // Random wait between 0 to 1 seconds - await new Promise(resolve => setTimeout(resolve, waitTime)); - continue; // Retry the request - } else { - console.error(`Request failed, status code ${response.status}: ${response.statusText}`); - console.error('Error response content:', errorText); - throw new Error(`Request failed, status code ${response.status}: ${response.statusText}`); - } - } + return data[0].embedding; } catch (err) { - // Handle network errors or other exceptions - console.error('Error occurred during HTTP request:', err); - throw err; // Re-throw the error to be handled by the caller + if (err.status === 429) { + // If a rate limit error occurs, calculate the exponential backoff with a random delay (1-5 seconds) + const delay = Math.pow(2, retries) * 1000 + Math.floor(Math.random() * 2000); + // console.log(`Rate limit hit, retrying in ${delay} ms...`); + await new Promise(resolve => setTimeout(resolve, delay)); // Wait for the delay before retrying + } else { + throw err; + } } } - // Exceeded maximum retries - console.error('Exceeded maximum retry attempts, unable to get request result.'); - throw new Error(`Request failed after ${maxRetries} retries.`); + // If maximum retries are reached and the request still fails, throw an error + throw new Error('Max retries reached, request failed.'); } + } + + +