Text Generation
MLX
Safetensors
Transformers
English
qwen3_moe
cdxgen
sbom
supply-chain-security
conversational
Instructions to use CycloneDX/cdx1-pro-mlx with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- MLX
How to use CycloneDX/cdx1-pro-mlx with MLX:
# Make sure mlx-lm is installed # pip install --upgrade mlx-lm # Generate text with mlx-lm from mlx_lm import load, generate model, tokenizer = load("CycloneDX/cdx1-pro-mlx") prompt = "Write a story about Einstein" messages = [{"role": "user", "content": prompt}] prompt = tokenizer.apply_chat_template( messages, add_generation_prompt=True ) text = generate(model, tokenizer, prompt=prompt, verbose=True) - Transformers
How to use CycloneDX/cdx1-pro-mlx with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("text-generation", model="CycloneDX/cdx1-pro-mlx") messages = [ {"role": "user", "content": "Who are you?"}, ] pipe(messages)# Load model directly from transformers import AutoTokenizer, AutoModelForCausalLM tokenizer = AutoTokenizer.from_pretrained("CycloneDX/cdx1-pro-mlx") model = AutoModelForCausalLM.from_pretrained("CycloneDX/cdx1-pro-mlx") messages = [ {"role": "user", "content": "Who are you?"}, ] inputs = tokenizer.apply_chat_template( messages, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt", ).to(model.device) outputs = model.generate(**inputs, max_new_tokens=40) print(tokenizer.decode(outputs[0][inputs["input_ids"].shape[-1]:])) - Notebooks
- Google Colab
- Kaggle
- Local Apps
- LM Studio
- vLLM
How to use CycloneDX/cdx1-pro-mlx with vLLM:
Install from pip and serve model
# Install vLLM from pip: pip install vllm # Start the vLLM server: vllm serve "CycloneDX/cdx1-pro-mlx" # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:8000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "CycloneDX/cdx1-pro-mlx", "messages": [ { "role": "user", "content": "What is the capital of France?" } ] }'Use Docker
docker model run hf.co/CycloneDX/cdx1-pro-mlx
- SGLang
How to use CycloneDX/cdx1-pro-mlx with SGLang:
Install from pip and serve model
# Install SGLang from pip: pip install sglang # Start the SGLang server: python3 -m sglang.launch_server \ --model-path "CycloneDX/cdx1-pro-mlx" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "CycloneDX/cdx1-pro-mlx", "messages": [ { "role": "user", "content": "What is the capital of France?" } ] }'Use Docker images
docker run --gpus all \ --shm-size 32g \ -p 30000:30000 \ -v ~/.cache/huggingface:/root/.cache/huggingface \ --env "HF_TOKEN=<secret>" \ --ipc=host \ lmsysorg/sglang:latest \ python3 -m sglang.launch_server \ --model-path "CycloneDX/cdx1-pro-mlx" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "CycloneDX/cdx1-pro-mlx", "messages": [ { "role": "user", "content": "What is the capital of France?" } ] }' - Pi new
How to use CycloneDX/cdx1-pro-mlx with Pi:
Start the MLX server
# Install MLX LM: uv tool install mlx-lm # Start a local OpenAI-compatible server: mlx_lm.server --model "CycloneDX/cdx1-pro-mlx"
Configure the model in Pi
# Install Pi: npm install -g @mariozechner/pi-coding-agent # Add to ~/.pi/agent/models.json: { "providers": { "mlx-lm": { "baseUrl": "http://localhost:8080/v1", "api": "openai-completions", "apiKey": "none", "models": [ { "id": "CycloneDX/cdx1-pro-mlx" } ] } } }Run Pi
# Start Pi in your project directory: pi
- Hermes Agent new
How to use CycloneDX/cdx1-pro-mlx with Hermes Agent:
Start the MLX server
# Install MLX LM: uv tool install mlx-lm # Start a local OpenAI-compatible server: mlx_lm.server --model "CycloneDX/cdx1-pro-mlx"
Configure Hermes
# Install Hermes: curl -fsSL https://hermes-agent.nousresearch.com/install.sh | bash hermes setup # Point Hermes at the local server: hermes config set model.provider custom hermes config set model.base_url http://127.0.0.1:8080/v1 hermes config set model.default CycloneDX/cdx1-pro-mlx
Run Hermes
hermes
- MLX LM
How to use CycloneDX/cdx1-pro-mlx with MLX LM:
Generate or start a chat session
# Install MLX LM uv tool install mlx-lm # Interactive chat REPL mlx_lm.chat --model "CycloneDX/cdx1-pro-mlx"
Run an OpenAI-compatible server
# Install MLX LM uv tool install mlx-lm # Start the server mlx_lm.server --model "CycloneDX/cdx1-pro-mlx" # Calling the OpenAI-compatible server with curl curl -X POST "http://localhost:8000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "CycloneDX/cdx1-pro-mlx", "messages": [ {"role": "user", "content": "Hello"} ] }' - Docker Model Runner
How to use CycloneDX/cdx1-pro-mlx with Docker Model Runner:
docker model run hf.co/CycloneDX/cdx1-pro-mlx
| # SPDX-License-Identifier: Apache-2.0 | |
| import json | |
| import re | |
| import uuid | |
| from collections.abc import Sequence | |
| from typing import Union, Optional, Any, List, Dict | |
| from enum import Enum | |
| from vllm.entrypoints.openai.protocol import ( | |
| ChatCompletionRequest, | |
| ChatCompletionToolsParam, | |
| DeltaMessage, | |
| DeltaToolCall, | |
| DeltaFunctionCall, | |
| ExtractedToolCallInformation, | |
| FunctionCall, | |
| ToolCall, | |
| ) | |
| from vllm.entrypoints.openai.tool_parsers.abstract_tool_parser import ( | |
| ToolParser, | |
| ToolParserManager, | |
| ) | |
| from vllm.logger import init_logger | |
| from vllm.transformers_utils.tokenizer import AnyTokenizer | |
| logger = init_logger(__name__) | |
| class Qwen3XMLToolParser(ToolParser): | |
| def __init__(self, tokenizer: AnyTokenizer): | |
| super().__init__(tokenizer) | |
| self.current_tool_name_sent: bool = False | |
| self.prev_tool_call_arr: list[dict] = [] | |
| self.current_tool_id: int = -1 | |
| self.streamed_args_for_tool: list[str] = [] | |
| # Sentinel tokens for streaming mode | |
| self.tool_call_start_token: str = "<tool_call>" | |
| self.tool_call_end_token: str = "</tool_call>" | |
| self.tool_call_prefix: str = "<function=" | |
| self.function_end_token: str = "</function>" | |
| self.parameter_prefix: str = "<parameter=" | |
| self.parameter_end_token: str = "</parameter>" | |
| self.is_tool_call_started: bool = False | |
| self.failed_count: int = 0 | |
| # Enhanced streaming state - reset for each new message | |
| self._reset_streaming_state() | |
| # Regex patterns | |
| self.tool_call_complete_regex = re.compile( | |
| r"<tool_call>(.*?)</tool_call>", re.DOTALL | |
| ) | |
| self.tool_call_regex = re.compile( | |
| r"<tool_call>(.*?)</tool_call>|<tool_call>(.*?)$", re.DOTALL | |
| ) | |
| self.tool_call_function_regex = re.compile( | |
| r"<function=(.*?)</function>|<function=(.*)$", re.DOTALL | |
| ) | |
| self.tool_call_parameter_regex = re.compile( | |
| r"<parameter=(.*?)</parameter>|<parameter=(.*?)$", re.DOTALL | |
| ) | |
| if not self.model_tokenizer: | |
| raise ValueError( | |
| "The model tokenizer must be passed to the ToolParser " | |
| "constructor during construction." | |
| ) | |
| self.tool_call_start_token_id = self.vocab.get(self.tool_call_start_token) | |
| self.tool_call_end_token_id = self.vocab.get(self.tool_call_end_token) | |
| if self.tool_call_start_token_id is None or self.tool_call_end_token_id is None: | |
| raise RuntimeError( | |
| "Qwen3 XML Tool parser could not locate tool call start/end " | |
| "tokens in the tokenizer!" | |
| ) | |
| logger.info(f"vLLM Successfully import tool parser {self.__class__.__name__} !") | |
| def _generate_tool_call_id(self) -> str: | |
| """Generate a unique tool call ID.""" | |
| return f"call_{uuid.uuid4().hex[:24]}" | |
| def _reset_streaming_state(self): | |
| """Reset all streaming state.""" | |
| self.current_tool_index = 0 | |
| self.is_tool_call_started = False | |
| self.header_sent = False | |
| self.current_tool_id = None | |
| self.current_function_name = None | |
| self.current_param_name = None | |
| self.current_param_value = "" | |
| self.param_count = 0 | |
| self.in_param = False | |
| self.in_function = False | |
| self.accumulated_text = "" | |
| self.json_started = False | |
| self.json_closed = False | |
| def _parse_xml_function_call( | |
| self, function_call_str: str, tools: Optional[list[ChatCompletionToolsParam]] | |
| ) -> Optional[ToolCall]: | |
| def get_arguments_config(func_name: str) -> dict: | |
| if tools is None: | |
| return {} | |
| for config in tools: | |
| if not hasattr(config, "type") or not ( | |
| hasattr(config, "function") and hasattr(config.function, "name") | |
| ): | |
| continue | |
| if config.type == "function" and config.function.name == func_name: | |
| if not hasattr(config.function, "parameters"): | |
| return {} | |
| params = config.function.parameters | |
| if isinstance(params, dict) and "properties" in params: | |
| return params["properties"] | |
| elif isinstance(params, dict): | |
| return params | |
| else: | |
| return {} | |
| logger.warning(f"Tool '{func_name}' is not defined in the tools list.") | |
| return {} | |
| def convert_param_value( | |
| param_value: str, param_name: str, param_config: dict, func_name: str | |
| ) -> Any: | |
| # Handle null value for any type | |
| if param_value.lower() == "null": | |
| return None | |
| if param_name not in param_config: | |
| if param_config != {}: | |
| logger.warning( | |
| f"Parsed parameter '{param_name}' is not defined in the tool " | |
| f"parameters for tool '{func_name}', directly returning the string value." | |
| ) | |
| return param_value | |
| if ( | |
| isinstance(param_config[param_name], dict) | |
| and "type" in param_config[param_name] | |
| ): | |
| param_type = str(param_config[param_name]["type"]).strip().lower() | |
| else: | |
| param_type = "string" | |
| if param_type in ["string", "str", "text", "varchar", "char", "enum"]: | |
| return param_value | |
| elif ( | |
| param_type.startswith("int") | |
| or param_type.startswith("uint") | |
| or param_type.startswith("long") | |
| or param_type.startswith("short") | |
| or param_type.startswith("unsigned") | |
| ): | |
| try: | |
| param_value = int(param_value) | |
| except: | |
| logger.warning( | |
| f"Parsed value '{param_value}' of parameter '{param_name}' is not an integer in tool " | |
| f"'{func_name}', degenerating to string." | |
| ) | |
| return param_value | |
| elif param_type.startswith("num") or param_type.startswith("float"): | |
| try: | |
| float_param_value = float(param_value) | |
| param_value = float_param_value if float_param_value - int(float_param_value) != 0 else int(float_param_value) | |
| except: | |
| logger.warning( | |
| f"Parsed value '{param_value}' of parameter '{param_name}' is not a float in tool " | |
| f"'{func_name}', degenerating to string." | |
| ) | |
| return param_value | |
| elif param_type in ["boolean", "bool", "binary"]: | |
| param_value = param_value.lower() | |
| if param_value not in ["true", "false"]: | |
| logger.warning( | |
| f"Parsed value '{param_value}' of parameter '{param_name}' is not a boolean (`true` of `false`) in tool '{func_name}', degenerating to false." | |
| ) | |
| return param_value == "true" | |
| else: | |
| if param_type == "object" or param_type.startswith("dict"): | |
| try: | |
| param_value = json.loads(param_value) | |
| return param_value | |
| except: | |
| logger.warning( | |
| f"Parsed value '{param_value}' of parameter '{param_name}' is not a valid JSON object in tool " | |
| f"'{func_name}', will try other methods to parse it." | |
| ) | |
| try: | |
| param_value = eval(param_value) | |
| except: | |
| logger.warning( | |
| f"Parsed value '{param_value}' of parameter '{param_name}' cannot be converted via Python `eval()` in tool '{func_name}', degenerating to string." | |
| ) | |
| return param_value | |
| # Extract function name | |
| end_index = function_call_str.index(">") | |
| function_name = function_call_str[:end_index] | |
| param_config = get_arguments_config(function_name) | |
| parameters = function_call_str[end_index + 1 :] | |
| param_dict = {} | |
| for match in self.tool_call_parameter_regex.findall(parameters): | |
| match_text = match[0] if match[0] else match[1] | |
| idx = match_text.index(">") | |
| param_name = match_text[:idx] | |
| param_value = str(match_text[idx + 1 :]) | |
| # Remove prefix and trailing \n | |
| if param_value.startswith("\n"): | |
| param_value = param_value[1:] | |
| if param_value.endswith("\n"): | |
| param_value = param_value[:-1] | |
| param_dict[param_name] = convert_param_value( | |
| param_value, param_name, param_config, function_name | |
| ) | |
| return ToolCall( | |
| type="function", | |
| function=FunctionCall( | |
| name=function_name, arguments=json.dumps(param_dict, ensure_ascii=False) | |
| ), | |
| ) | |
| def _get_function_calls(self, model_output: str) -> List[str]: | |
| # Find all tool calls | |
| matched_ranges = self.tool_call_regex.findall(model_output) | |
| raw_tool_calls = [ | |
| match[0] if match[0] else match[1] for match in matched_ranges | |
| ] | |
| # Back-off strategy if no tool_call tags found | |
| if len(raw_tool_calls) == 0: | |
| raw_tool_calls = [model_output] | |
| raw_function_calls = [] | |
| for tool_call in raw_tool_calls: | |
| raw_function_calls.extend(self.tool_call_function_regex.findall(tool_call)) | |
| function_calls = [ | |
| match[0] if match[0] else match[1] for match in raw_function_calls | |
| ] | |
| return function_calls | |
| def extract_tool_calls( | |
| self, | |
| model_output: str, | |
| request: ChatCompletionRequest, | |
| ) -> ExtractedToolCallInformation: | |
| # Quick check to avoid unnecessary processing | |
| if self.tool_call_prefix not in model_output: | |
| return ExtractedToolCallInformation( | |
| tools_called=False, tool_calls=[], content=model_output | |
| ) | |
| try: | |
| function_calls = self._get_function_calls(model_output) | |
| if len(function_calls) == 0: | |
| return ExtractedToolCallInformation( | |
| tools_called=False, tool_calls=[], content=model_output | |
| ) | |
| tool_calls = [ | |
| self._parse_xml_function_call(function_call_str, request.tools) | |
| for function_call_str in function_calls | |
| ] | |
| # Populate prev_tool_call_arr for serving layer to set finish_reason | |
| self.prev_tool_call_arr.clear() # Clear previous calls | |
| for tool_call in tool_calls: | |
| if tool_call: | |
| self.prev_tool_call_arr.append( | |
| { | |
| "name": tool_call.function.name, | |
| "arguments": tool_call.function.arguments, | |
| } | |
| ) | |
| # Extract content before tool calls | |
| content_index = model_output.find(self.tool_call_start_token) | |
| content_index = ( | |
| content_index | |
| if content_index >= 0 | |
| else model_output.find(self.tool_call_prefix) | |
| ) | |
| content = model_output[:content_index] # .rstrip() | |
| return ExtractedToolCallInformation( | |
| tools_called=(len(tool_calls) > 0), | |
| tool_calls=tool_calls, | |
| content=content if content else None, | |
| ) | |
| except Exception: | |
| logger.exception("Error in extracting tool call from response.") | |
| return ExtractedToolCallInformation( | |
| tools_called=False, tool_calls=[], content=model_output | |
| ) | |
| def extract_tool_calls_streaming( | |
| self, | |
| previous_text: str, | |
| current_text: str, | |
| delta_text: str, | |
| previous_token_ids: Sequence[int], | |
| current_token_ids: Sequence[int], | |
| delta_token_ids: Sequence[int], | |
| request: ChatCompletionRequest, | |
| ) -> Union[DeltaMessage, None]: | |
| # If no delta text, return None unless it's an EOS token after tool calls | |
| if not delta_text: | |
| # Check if this is an EOS token after all tool calls are complete | |
| # We check for tool calls in the text even if is_tool_call_started is False | |
| # because it might have been reset after processing all tools | |
| if delta_token_ids and self.tool_call_end_token_id not in delta_token_ids: | |
| # Count complete tool calls | |
| complete_calls = len( | |
| self.tool_call_complete_regex.findall(current_text) | |
| ) | |
| # If we have completed tool calls and populated prev_tool_call_arr | |
| if complete_calls > 0 and len(self.prev_tool_call_arr) > 0: | |
| # Check if all tool calls are closed | |
| open_calls = current_text.count( | |
| self.tool_call_start_token | |
| ) - current_text.count(self.tool_call_end_token) | |
| if open_calls == 0: | |
| # Return empty delta message to allow finish_reason processing | |
| return DeltaMessage(content="") | |
| elif not self.is_tool_call_started and current_text: | |
| # This is a regular content response that's now complete | |
| return DeltaMessage(content="") | |
| return None | |
| # Check if this is the first call (reset state if needed) | |
| if not previous_text: | |
| self._reset_streaming_state() | |
| # Update accumulated text | |
| self.accumulated_text = current_text | |
| # Check if we need to advance to next tool | |
| if self.json_closed and not self.in_function: | |
| # Check if this tool call has ended | |
| tool_ends = current_text.count(self.tool_call_end_token) | |
| if tool_ends > self.current_tool_index: | |
| # This tool has ended, advance to next | |
| self.current_tool_index += 1 | |
| self.header_sent = False | |
| self.param_count = 0 | |
| self.json_started = False | |
| self.json_closed = False | |
| # Check if there are more tool calls | |
| tool_starts = current_text.count(self.tool_call_start_token) | |
| if self.current_tool_index >= tool_starts: | |
| # No more tool calls | |
| self.is_tool_call_started = False | |
| # Continue processing next tool | |
| return None | |
| # Handle normal content before tool calls | |
| if not self.is_tool_call_started: | |
| # Check if tool call is starting | |
| if ( | |
| self.tool_call_start_token_id in delta_token_ids | |
| or self.tool_call_start_token in delta_text | |
| ): | |
| self.is_tool_call_started = True | |
| # Return any content before the tool call | |
| if self.tool_call_start_token in delta_text: | |
| content_before = delta_text[ | |
| : delta_text.index(self.tool_call_start_token) | |
| ] | |
| if content_before: | |
| return DeltaMessage(content=content_before) | |
| return None | |
| else: | |
| # Check if we're between tool calls - skip whitespace | |
| if current_text.rstrip().endswith(self.tool_call_end_token): | |
| # We just ended a tool call, skip whitespace | |
| if delta_text.strip() == "": | |
| return None | |
| # Normal content, no tool call | |
| return DeltaMessage(content=delta_text) | |
| # Check if we're between tool calls (waiting for next one) | |
| # Count tool calls we've seen vs processed | |
| tool_starts_count = current_text.count(self.tool_call_start_token) | |
| if self.current_tool_index >= tool_starts_count: | |
| # We're past all tool calls, shouldn't be here | |
| return None | |
| # We're in a tool call, find the current tool call portion | |
| # Need to find the correct tool call based on current_tool_index | |
| tool_starts = [] | |
| idx = 0 | |
| while True: | |
| idx = current_text.find(self.tool_call_start_token, idx) | |
| if idx == -1: | |
| break | |
| tool_starts.append(idx) | |
| idx += len(self.tool_call_start_token) | |
| if self.current_tool_index >= len(tool_starts): | |
| # No more tool calls to process yet | |
| return None | |
| tool_start_idx = tool_starts[self.current_tool_index] | |
| # Find where this tool call ends (or current position if not ended yet) | |
| tool_end_idx = current_text.find(self.tool_call_end_token, tool_start_idx) | |
| if tool_end_idx == -1: | |
| tool_text = current_text[tool_start_idx:] | |
| else: | |
| tool_text = current_text[ | |
| tool_start_idx : tool_end_idx + len(self.tool_call_end_token) | |
| ] | |
| # Looking for function header | |
| if not self.header_sent: | |
| if self.tool_call_prefix in tool_text: | |
| func_start = tool_text.find(self.tool_call_prefix) + len( | |
| self.tool_call_prefix | |
| ) | |
| func_end = tool_text.find(">", func_start) | |
| if func_end != -1: | |
| # Found complete function name | |
| self.current_function_name = tool_text[func_start:func_end] | |
| self.current_tool_id = self._generate_tool_call_id() | |
| self.header_sent = True | |
| self.in_function = True | |
| # IMPORTANT: Add to prev_tool_call_arr immediately when we detect a tool call | |
| # This ensures finish_reason="tool_calls" even if parsing isn't complete | |
| already_added = any( | |
| tool.get("name") == self.current_function_name | |
| for tool in self.prev_tool_call_arr | |
| ) | |
| if not already_added: | |
| self.prev_tool_call_arr.append( | |
| { | |
| "name": self.current_function_name, | |
| "arguments": "{}", # Placeholder, will be updated later | |
| } | |
| ) | |
| # Send header with function info | |
| return DeltaMessage( | |
| tool_calls=[ | |
| DeltaToolCall( | |
| index=self.current_tool_index, | |
| id=self.current_tool_id, | |
| function=DeltaFunctionCall( | |
| name=self.current_function_name, arguments="" | |
| ), | |
| type="function", | |
| ) | |
| ] | |
| ) | |
| return None | |
| # We've sent header, now handle function body | |
| if self.in_function: | |
| # Send opening brace if not sent yet | |
| if not self.json_started and not self.parameter_prefix in delta_text: | |
| self.json_started = True | |
| return DeltaMessage( | |
| tool_calls=[ | |
| DeltaToolCall( | |
| index=self.current_tool_index, | |
| function=DeltaFunctionCall(arguments="{"), | |
| ) | |
| ] | |
| ) | |
| # Make sure json_started is set if we're processing parameters | |
| if not self.json_started: | |
| self.json_started = True | |
| # Check for function end in accumulated text | |
| if not self.json_closed and self.function_end_token in tool_text: | |
| # Close JSON | |
| self.json_closed = True | |
| # Extract the complete tool call to update prev_tool_call_arr with final arguments | |
| # Find the function content | |
| func_start = tool_text.find(self.tool_call_prefix) + len( | |
| self.tool_call_prefix | |
| ) | |
| func_content_end = tool_text.find(self.function_end_token, func_start) | |
| if func_content_end != -1: | |
| func_content = tool_text[func_start:func_content_end] | |
| # Parse to get the complete arguments | |
| try: | |
| parsed_tool = self._parse_xml_function_call( | |
| func_content, request.tools if request else None | |
| ) | |
| if parsed_tool: | |
| # Update existing entry in prev_tool_call_arr with complete arguments | |
| for i, tool in enumerate(self.prev_tool_call_arr): | |
| if tool.get("name") == parsed_tool.function.name: | |
| self.prev_tool_call_arr[i]["arguments"] = ( | |
| parsed_tool.function.arguments | |
| ) | |
| break | |
| except Exception: | |
| pass # Ignore parsing errors during streaming | |
| result = DeltaMessage( | |
| tool_calls=[ | |
| DeltaToolCall( | |
| index=self.current_tool_index, | |
| function=DeltaFunctionCall(arguments="}"), | |
| ) | |
| ] | |
| ) | |
| # Reset state for next tool | |
| self.in_function = False | |
| self.json_closed = True | |
| return result | |
| # Look for parameters | |
| # Count how many complete parameters we have processed | |
| complete_params = tool_text.count(self.parameter_end_token) | |
| # Check if we should start a new parameter | |
| if not self.in_param and self.param_count < complete_params: | |
| # Find the unprocessed parameter | |
| # Count parameter starts | |
| param_starts = [] | |
| idx = 0 | |
| while True: | |
| idx = tool_text.find(self.parameter_prefix, idx) | |
| if idx == -1: | |
| break | |
| param_starts.append(idx) | |
| idx += len(self.parameter_prefix) | |
| if len(param_starts) > self.param_count: | |
| # Process the next parameter | |
| param_idx = param_starts[self.param_count] | |
| param_start = param_idx + len(self.parameter_prefix) | |
| remaining = tool_text[param_start:] | |
| if ">" in remaining: | |
| # We have the complete parameter name | |
| name_end = remaining.find(">") | |
| self.current_param_name = remaining[:name_end] | |
| # Find the parameter value | |
| value_start = param_start + name_end + 1 | |
| value_text = tool_text[value_start:] | |
| if value_text.startswith("\n"): | |
| value_text = value_text[1:] | |
| # Find where this parameter ends | |
| param_end_idx = value_text.find(self.parameter_end_token) | |
| if param_end_idx != -1: | |
| # Complete parameter found | |
| param_value = value_text[:param_end_idx] | |
| if param_value.endswith("\n"): | |
| param_value = param_value[:-1] | |
| # Build complete JSON fragment for this parameter | |
| if self.param_count == 0: | |
| json_fragment = ( | |
| '"' | |
| + self.current_param_name | |
| + '": "' | |
| + json.dumps(param_value)[1:-1] | |
| + '"' | |
| ) | |
| else: | |
| json_fragment = ( | |
| ', "' | |
| + self.current_param_name | |
| + '": "' | |
| + json.dumps(param_value)[1:-1] | |
| + '"' | |
| ) | |
| self.param_count += 1 | |
| return DeltaMessage( | |
| tool_calls=[ | |
| DeltaToolCall( | |
| index=self.current_tool_index, | |
| function=DeltaFunctionCall( | |
| arguments=json_fragment | |
| ), | |
| ) | |
| ] | |
| ) | |
| # Continue parameter value | |
| if self.in_param: | |
| if self.parameter_end_token in delta_text: | |
| # End of parameter | |
| end_idx = delta_text.find(self.parameter_end_token) | |
| value_chunk = delta_text[:end_idx] | |
| # Skip past > if at start | |
| if not self.current_param_value and ">" in value_chunk: | |
| gt_idx = value_chunk.find(">") | |
| value_chunk = value_chunk[gt_idx + 1 :] | |
| if not self.current_param_value and value_chunk.startswith("\n"): | |
| value_chunk = value_chunk[1:] | |
| # Calculate incremental JSON | |
| full_value = self.current_param_value + value_chunk | |
| prev_escaped = ( | |
| json.dumps(self.current_param_value)[1:-1] | |
| if self.current_param_value | |
| else "" | |
| ) | |
| full_escaped = json.dumps(full_value)[1:-1] | |
| delta_escaped = full_escaped[len(prev_escaped) :] | |
| self.in_param = False | |
| self.current_param_value = "" | |
| return DeltaMessage( | |
| tool_calls=[ | |
| DeltaToolCall( | |
| index=self.current_tool_index, | |
| function=DeltaFunctionCall( | |
| arguments=delta_escaped + '"' | |
| ), | |
| ) | |
| ] | |
| ) | |
| else: | |
| # Continue accumulating value | |
| value_chunk = delta_text | |
| # Handle first chunk after param name | |
| if not self.current_param_value and ">" in value_chunk: | |
| gt_idx = value_chunk.find(">") | |
| value_chunk = value_chunk[gt_idx + 1 :] | |
| if not self.current_param_value and value_chunk.startswith("\n"): | |
| value_chunk = value_chunk[1:] | |
| if value_chunk: | |
| # Stream the escaped delta | |
| prev_escaped = ( | |
| json.dumps(self.current_param_value)[1:-1] | |
| if self.current_param_value | |
| else "" | |
| ) | |
| self.current_param_value += value_chunk | |
| full_escaped = json.dumps(self.current_param_value)[1:-1] | |
| delta_escaped = full_escaped[len(prev_escaped) :] | |
| if delta_escaped: | |
| return DeltaMessage( | |
| tool_calls=[ | |
| DeltaToolCall( | |
| index=self.current_tool_index, | |
| function=DeltaFunctionCall( | |
| arguments=delta_escaped | |
| ), | |
| ) | |
| ] | |
| ) | |
| return None | |