diff --git a/AI_prompt.md b/AI_prompt.md index 4904b5e..0bc023c 100644 --- a/AI_prompt.md +++ b/AI_prompt.md @@ -172,7 +172,7 @@ Can you update the content of the following files according to ASG_Framework/ASG -I updated ./src/NATSBridge.jl. Specifically smartsend() and smartreceive(). Check ./docs folder I want to update the content of the following files according to /home/ton/docker-apps/sommpanion/ASG_Framework/ASG_Framework.md: +I updated ./src/NATSBridge.jl. Use it as groundtruth. Check ./docs folder I want to update the content of the following files according to /home/ton/docker-apps/sommpanion/ASG_Framework/ASG_Framework.md: - ./docs/requirements.md - ./docs/specification.md - ./docs/walkthrough.md diff --git a/docs/architecture.md b/docs/architecture.md index 592fd7d..a618dac 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -1,7 +1,7 @@ # Architecture Documentation: NATSBridge -**Version**: 1.1.0 -**Date**: 2026-03-23 +**Version**: 1.2.0 +**Date**: 2026-05-13 **Status**: Active **Ground Truth**: [`src/NATSBridge.jl`](../src/NATSBridge.jl) **Architecture Level**: C4 Container Level @@ -139,36 +139,31 @@ flowchart TD Serialize[_serialize_data] Deserialize[_deserialize_data] - BuildEnvelope[build_envelope] - BuildPayload[build_payload] - - PublishMessage[publish_message] + EnvelopeToJson[envelope_to_json] FileServerUpload[fileserver_upload_handler] FileServerDownload[fileserver_download_handler] + + LogTrace[log_trace] end subgraph "Data Models" - Payload[MsgPayloadV1 Struct] - Envelope[MsgEnvelopeV1 Struct] + Payload[msg_payload_v1 Struct] + Envelope[msg_envelope_v1 Struct] end SmartSend --> Serialize - SmartSend --> BuildEnvelope - SmartSend --> BuildPayload - SmartSend --> PublishMessage + SmartSend --> EnvelopeToJson SmartSend --> FileServerUpload - + SmartReceive --> Deserialize SmartReceive --> FileServerDownload - + + EnvelopeToJson --> Envelope Serialize --> Payload - BuildEnvelope --> Envelope - BuildPayload --> Payload style SmartSend fill:#d1fae5,stroke:#10b981 style SmartReceive fill:#d1fae5,stroke:#10b981 - style PublishMessage fill:#fef3c7,stroke:#f59e0b style FileServerUpload fill:#fef3c7,stroke:#f59e0b style FileServerDownload fill:#fef3c7,stroke:#f59e0b ``` @@ -181,15 +176,14 @@ flowchart TD | Component | Purpose | Platform Support | |-----------|---------|------------------| -| **smartsend** | Send data via NATS with automatic transport selection | All | -| **smartreceive** | Receive and process NATS messages | All | +| **smartsend** | Send data via NATS with automatic transport selection, returns (envelope, json_string) for caller to publish | All | +| **smartreceive** | Receive and process NATS messages from JSON string | All | | **_serialize_data** | Serialize data according to payload type | All | | **_deserialize_data** | Deserialize bytes to native data types | All | -| **_build_envelope** | Build message envelope from payloads | All | -| **_build_payload** | Build payload object from serialized data | All | -| **publish_message** | Publish message to NATS subject | All | +| **envelope_to_json** | Convert msg_envelope_v1 struct to JSON string | All | +| **log_trace** | Log trace messages with correlation ID | All | | **fileserver_upload_handler** | Upload large payloads to HTTP server | Desktop (Julia/JS/Python/Dart) | -| **fileserver_download_handler** | Download payloads from HTTP server | Desktop (Julia/JS/Python/Dart) | +| **fileserver_download_handler** | Download payloads from HTTP server with exponential backoff | Desktop (Julia/JS/Python/Dart) | ### Data Flow @@ -211,7 +205,7 @@ flowchart TD H --> L[Build envelope] L --> M[Convert to JSON] - M --> N[Publish to NATS] + M --> N[Return envelope + JSON to caller] style A fill:#f9f9f9,stroke:#333 style N fill:#e0e7ff,stroke:#3b82f6 @@ -437,11 +431,9 @@ end JavaScript uses async/await for non-blocking I/O: -- **Class-based NATS Client**: Connection management with `keepAlive` support - **Module-level Utilities**: Serialization functions - **Native ArrayBuffer**: Binary data handling (Browser) / Buffer (Node.js) - **Fetch API**: HTTP file server communication -- **Connection Pooling**: `NATSConnectionPool` for high-throughput scenarios #### Node.js Implementation (natsbridge_ssr.js) @@ -449,36 +441,6 @@ JavaScript uses async/await for non-blocking I/O: - **Apache Arrow IPC**: Full support via `apache-arrow` - **Buffer for binary data**: Native Node.js Buffer handling -```javascript -// Class-based NATS client with keepAlive support -class NATSClient { - constructor(url, keepAlive = false) { - this.url = url; - this.connection = null; - this.keepAlive = keepAlive; - } - - async connect() { - if (this.connection) return this.connection; - this.connection = await nats.connect({ servers: this.url }); - return this.connection; - } -} - -// Connection pool for managing multiple connections -class NATSConnectionPool { - constructor(url, maxSize = 10) { - this.url = url; - this.maxSize = maxSize; - this.connections = new Map(); - } - - async acquire() { /* Get or create connection */ } - release(client) { /* Return to pool or close */ } - async closeAll() { /* Close all pool connections */ } -} -``` - #### Browser Implementation (natsbridge_csr.js) - **WebSocket NATS connections**: Uses `ws://` or `wss://` URLs via `nats.ws` @@ -486,23 +448,6 @@ class NATSConnectionPool { - **Uint8Array for binary data**: Browser-compatible binary handling - **Web Crypto API**: UUID generation via `crypto.getRandomValues()` -```javascript -// Class-based NATS client with keepAlive support -class NATSClient { - constructor(url, keepAlive = false) { - this.url = url; // ws:// or wss:// - this.connection = null; - this.keepAlive = keepAlive; - } - - async connect() { - if (this.connection) return this.connection; - this.connection = await nats.connect({ servers: this.url }); - return this.connection; - } -} -``` - ### Python Architecture Python uses classes for stateful operations: @@ -740,7 +685,7 @@ MAX_PAYLOAD_SIZE = 50_000 # 50KB hard limit |----------|---------|-------------| | `NATS_URL` | `nats://localhost:4222` | NATS server URL | | `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | -| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes | +| `SIZE_THRESHOLD` | `500000` | Size threshold in bytes (0.5MB) | ### Container Deployment @@ -834,6 +779,12 @@ flowchart TD | Date | Version | Changes | |------|---------|---------| +| 2026-05-13 | 1.2.0 | Aligned with ground truth implementation (src/NATSBridge.jl) | +| - | - | Removed publish_message component (commented out in source) | +| - | - | Removed NATSClient and NATSConnectionPool classes (not in ground truth) | +| - | - | Updated component diagram to match actual module structure | +| - | - | Updated data flow to show smartsend returns JSON for caller to publish | +| - | - | Fixed SIZE_THRESHOLD default to 500,000 bytes | | 2026-03-15 | 1.1.0 | JavaScript connection management | | - | - | Added NATSClient with keepAlive support | | - | - | Added NATSConnectionPool for connection reuse | diff --git a/docs/requirements.md b/docs/requirements.md index ed178da..4f3d685 100644 --- a/docs/requirements.md +++ b/docs/requirements.md @@ -1,7 +1,7 @@ # Requirements Document: NATSBridge -**Version**: 1.0.0 -**Date**: 2026-03-23 +**Version**: 1.2.0 +**Date**: 2026-05-13 **Status**: Active **Ground Truth**: [`src/NATSBridge.jl`](../src/NATSBridge.jl) @@ -118,8 +118,8 @@ NATSBridge is a cross-platform, bi-directional data bridge that enables seamless | **FR-010** | Exponential backoff retry | System shall implement exponential backoff with configurable retries (default: 5, base_delay: 100ms, max_delay: 5000ms) for file server download failures | | **FR-011** | Correlation ID propagation | System shall propagate correlation IDs through all message processing steps | | **FR-012** | Message serialization | System shall serialize data types using Base64, JSON, or Arrow IPC encoding | -| **FR-013** | NATS publishing | System shall publish messages to NATS subjects | -| **FR-014** | NATS subscription | System shall receive and process NATS messages | +| **FR-013** | NATS publishing | System shall return JSON string representation for caller to publish to NATS subjects (caller is responsible for actual NATS publish) | +| **FR-014** | NATS subscription | System shall receive and process NATS messages by accepting JSON string from NATS payload | --- @@ -324,11 +324,11 @@ NATSBridge is a cross-platform, bi-directional data bridge that enables seamless ```julia function smartsend( subject::String, - data::AbstractArray{Tuple{String, Any, String}}; - broker_url::String = "nats://localhost:4222", - fileserver_url::String = "http://localhost:8080", + data::AbstractArray{Tuple{String, T1, String}, 1}; + broker_url::String = DEFAULT_BROKER_URL, + fileserver_url::String = DEFAULT_FILESERVER_URL, fileserver_upload_handler::Function = plik_oneshot_upload, - size_threshold::Int = 1_000_000, + size_threshold::Int = DEFAULT_SIZE_THRESHOLD, correlation_id::String = string(uuid4()), msg_purpose::String = "chat", sender_name::String = "NATSBridge", @@ -336,18 +336,18 @@ function smartsend( receiver_id::String = "", reply_to::String = "", reply_to_msg_id::String = "", - is_publish::Bool = true, - NATS_connection::Union{NATS.Connection, Nothing} = nothing, msg_id::String = string(uuid4()), sender_id::String = string(uuid4()) -)::Tuple{msg_envelope_v1, String} +)::Tuple{msg_envelope_v1, String} where {T1<:Any} ``` +**Note**: NATS publishing is the caller's responsibility. `smartsend` returns `(env::msg_envelope_v1, env_json_str::String)`. + ### 11.2 smartreceive Signature ```julia function smartreceive( - msg::NATS.Msg; + msg_json_str::String; fileserver_download_handler::Function = _fetch_with_backoff, max_retries::Int = 5, base_delay::Int = 100, @@ -355,6 +355,8 @@ function smartreceive( )::JSON.Object{String, Any} ``` +**Note**: Pass `String(nats_msg.payload)` from NATS subscription to `smartreceive`. + --- ## 12. Deployment Requirements @@ -374,7 +376,7 @@ function smartreceive( |----------|---------|-------------| | `NATS_URL` | `nats://localhost:4222` | NATS server URL | | `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | -| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes | +| `SIZE_THRESHOLD` | `500000` | Size threshold in bytes (0.5MB) | --- @@ -398,6 +400,13 @@ function smartreceive( | Date | Version | Changes | |------|---------|---------| +| 2026-05-13 | 1.2.0 | Aligned with ground truth implementation (src/NATSBridge.jl) | +| - | - | Fixed smartsend signature: removed is_publish, NATS_connection; added sender_name | +| - | - | Fixed smartreceive signature: takes msg_json_str::String instead of msg::NATS.Msg | +| - | - | Fixed size_threshold default from 1,000,000 to 500,000 | +| - | - | Updated FR-013/FR-014 to reflect caller responsibility for NATS publishing | +| - | - | Updated FR-008/FR-009 to include file path upload overload | +| - | - | Updated SIZE_THRESHOLD env var default to 500000 | | 2026-03-23 | 1.0.0 | Updated to ASG Framework requirements structure | --- diff --git a/docs/specification.md b/docs/specification.md index e97bf91..2b08517 100644 --- a/docs/specification.md +++ b/docs/specification.md @@ -1,7 +1,7 @@ # Specification: NATSBridge -**Version**: 1.1.0 -**Date**: 2026-03-23 +**Version**: 1.2.0 +**Date**: 2026-05-13 **Status**: Active **Ground Truth**: [`src/NATSBridge.jl`](../src/NATSBridge.jl) **Specification Format**: JSON Schema + AsyncAPI @@ -418,11 +418,11 @@ When `transport = "link"`, the `data` field contains a URL pointing to the uploa ```julia function smartsend( subject::String, - data::AbstractArray{Tuple{String, Any, String}}; - broker_url::String = "nats://localhost:4222", - fileserver_url::String = "http://localhost:8080", + data::AbstractArray{Tuple{String, T1, String}, 1}; + broker_url::String = DEFAULT_BROKER_URL, + fileserver_url::String = DEFAULT_FILESERVER_URL, fileserver_upload_handler::Function = plik_oneshot_upload, - size_threshold::Int = 500_000, + size_threshold::Int = DEFAULT_SIZE_THRESHOLD, correlation_id::String = string(uuid4()), msg_purpose::String = "chat", sender_name::String = "NATSBridge", @@ -430,13 +430,13 @@ function smartsend( receiver_id::String = "", reply_to::String = "", reply_to_msg_id::String = "", - is_publish::Bool = true, - NATS_connection::Union{NATS.Connection, Nothing} = nothing, msg_id::String = string(uuid4()), sender_id::String = string(uuid4()) -)::Tuple{msg_envelope_v1, String} +)::Tuple{msg_envelope_v1, String} where {T1<:Any} ``` +**Note**: NATS publishing is the caller's responsibility. Returns `(env::msg_envelope_v1, env_json_str::String)`. + #### Python ```python @@ -454,13 +454,13 @@ async def smartsend( receiver_id: str = "", reply_to: str = "", reply_to_msg_id: str = "", - is_publish: bool = True, - nats_connection: Any = None, msg_id: str = None, sender_id: str = None ) -> Tuple[Dict, str]: ``` +**Note**: NATS publishing is the caller's responsibility. + #### JavaScript (Node.js) ```typescript @@ -479,14 +479,14 @@ async function smartsend( receiver_id?: string; reply_to?: string; reply_to_msg_id?: string; - is_publish?: boolean; - nats_connection?: NATS.Connection; msg_id?: string; sender_id?: string; } ): Promise<[Object, string]>; ``` +**Note**: NATS publishing is the caller's responsibility. + #### JavaScript (Browser) ```typescript @@ -505,51 +505,27 @@ async function smartsend( receiver_id?: string; reply_to?: string; reply_to_msg_id?: string; - is_publish?: boolean; - nats_connection?: NATSClient | NATS.Connection; msg_id?: string; sender_id?: string; } ): Promise<[Object, string]>; - -// NATSClient class for connection management -class NATSClient { - constructor(url: string, keepAlive?: boolean); - connect(): Promise; - publish(subject: string, message: string, correlationId: string): Promise; - close(): Promise; - getConnection(): NATS.Connection | null; - isConnected(): boolean; -} - -// NATSConnectionPool for managing multiple connections -class NATSConnectionPool { - constructor(url: string, maxSize?: number); - acquire(): Promise; - release(client: NATSClient): void; - closeAll(): Promise; -} - -// publishMessage function for manual publishing -async function publishMessage( - brokerUrlOrClient: string | NATSClient | NATS.Connection, - subject: string, - message: string, - correlationId: string, - closeConnection?: boolean -): Promise; ``` +**Note**: NATS publishing is the caller's responsibility. + #### MicroPython ```python def smartsend( subject: str, data: List[Tuple[str, Any, str]], + size_threshold: int = 100_000, # Lower threshold for memory constraints **kwargs ) -> Tuple[Dict, str]: ``` +**Note**: NATS publishing is the caller's responsibility. + #### Dart (Desktop/Flutter) ```dart @@ -567,12 +543,11 @@ Future<[Map, String]> smartsend( String receiverId = '', String replyTo = '', String replyToMsgId = '', - bool isPublish = true, - dynamic natsConnection, String? msgId, String? senderId, }) async { // Returns [envelope, jsonString] + // NATS publishing is caller's responsibility } ``` @@ -593,12 +568,11 @@ Future<[Map, String]> smartsend( String receiverId = '', String replyTo = '', String replyToMsgId = '', - bool isPublish = true, - dynamic natsConnection, String? msgId, String? senderId, }) async { // Returns [envelope, jsonString] + // NATS publishing is caller's responsibility } ``` @@ -608,7 +582,7 @@ Future<[Map, String]> smartsend( ```julia function smartreceive( - msg::NATS.Msg; + msg_json_str::String; # Pass String(nats_msg.payload) from NATS subscription fileserver_download_handler::Function = _fetch_with_backoff, max_retries::Int = 5, base_delay::Int = 100, @@ -616,11 +590,13 @@ function smartreceive( )::JSON.Object{String, Any} ``` +**Note**: Input is JSON string from NATS message payload, not NATS.Msg directly. + #### Python ```python async def smartreceive( - msg: Any, + msg_json_str: str, # JSON string from NATS message payload fileserver_download_handler: Callable = fetch_with_backoff, max_retries: int = 5, base_delay: int = 100, @@ -628,11 +604,13 @@ async def smartreceive( ) -> Dict[str, Any]: ``` +**Note**: Input is JSON string from NATS message payload. + #### JavaScript (Node.js) ```typescript async function smartreceive( - msg: Object, + msg_json_str: string, // JSON string from NATS message payload options?: { fileserver_download_handler?: Function; max_retries?: number; @@ -646,7 +624,7 @@ async function smartreceive( ```typescript async function smartreceive( - msg: Object, + msg_json_str: string, // JSON string from NATS message payload options?: { fileserver_download_handler?: Function; max_retries?: number; @@ -656,17 +634,22 @@ async function smartreceive( ): Promise; ``` +**Note**: Input is JSON string from NATS message payload. + #### MicroPython ```python -def smartreceive(msg: Any, **kwargs) -> Dict[str, Any]: +def smartreceive(msg_json_str: str, **kwargs) -> Dict[str, Any]: ``` +**Note**: Input is JSON string from NATS message payload. + #### Dart (Desktop/Flutter) ```dart Future> smartreceive( - Map msg, { + Map msg_json_str, // JSON object from NATS message payload + { Function? fileserverDownloadHandler, int maxRetries = 5, int baseDelay = 100, @@ -680,7 +663,8 @@ Future> smartreceive( ```dart Future> smartreceive( - Map msg, { + Map msg_json_str, // JSON object from NATS message payload + { Function? fileserverDownloadHandler, int maxRetries = 5, int baseDelay = 100, @@ -703,6 +687,12 @@ function fileserver_upload_handler( dataname::String, data::Vector{UInt8} )::Dict{String, Any} + +# Overload: Upload file from disk +function fileserver_upload_handler( + file_server_url::String, + filepath::String +)::Dict{String, Any} ``` **Return Format**: @@ -1057,6 +1047,12 @@ flowchart TD | Date | Version | Changes | Requirement ID(s) | |------|---------|---------|-------------------| +| 2026-05-13 | 1.2.0 | Aligned with ground truth implementation (src/NATSBridge.jl) | All | +| - | - | Updated smartsend signatures: removed is_publish, nats_connection; added sender_name | FR-001 through FR-014 | +| - | - | Updated smartreceive signatures: takes msg_json_str::String instead of msg | FR-001 through FR-014 | +| - | - | Removed publishMessage function and NATSClient/NATSConnectionPool classes from browser section | FR-013, FR-014 | +| - | - | Added plik_oneshot_upload(filepath) overload to file server interface | FR-008, FR-009 | +| - | - | Fixed SIZE_THRESHOLD default to 500,000 bytes | FR-003, FR-004 | | 2026-03-23 | 1.1.0 | Updated to ASG Framework specification guidelines | All | | 2026-03-15 | 1.1.0 | Browser connection management | FR-001 through FR-014 | | 2026-03-13 | 1.0.0 | Initial specification | FR-001 through FR-014, NFR-101 through NFR-405 | diff --git a/docs/walkthrough.md b/docs/walkthrough.md index 4f5c554..e50179f 100644 --- a/docs/walkthrough.md +++ b/docs/walkthrough.md @@ -1,7 +1,7 @@ # Walkthrough: NATSBridge -**Version**: 1.0.0 -**Date**: 2026-03-23 +**Version**: 1.2.0 +**Date**: 2026-05-13 **Status**: Active **Ground Truth**: [`src/NATSBridge.jl`](../src/NATSBridge.jl) @@ -213,23 +213,25 @@ NATSBridge builds the message envelope: - **reply_to**: Tells backend where to send response - **payloads array**: Contains all data with metadata for proper handling -#### Step 5: Publish to NATS +#### Step 5: Publish to NATS (Caller's Responsibility) ```javascript -await NATSBridge.NATSClient.connect("ws://localhost:4222"); -await NATSBridge.NATSClient.publish("/agent/wine/api/v1/prompt", msgJson); +// NATS publishing is the caller's responsibility +const conn = await NATS.connect({ servers: "ws://localhost:4222" }); +await conn.publish("/agent/wine/api/v1/prompt", msgJson); ``` **Rationale**: - NATS provides low-latency message delivery - JSON format ensures cross-platform compatibility +- `smartsend()` returns `(env, msgJson)` - caller handles publishing #### Step 6: Julia Backend Receives Message ```julia # Julia backend -msg = NATS.subscription.next() # Get message from NATS -env = smartreceive(msg) +nats_msg = NATS.subscription.next() # Get message from NATS +env = smartreceive(String(nats_msg.payload)) # env["payloads"] is now: # [ @@ -355,8 +357,8 @@ const response = await plikOneshotUpload( ```julia # Julia backend -msg = NATS.subscription.next() -env = smartreceive(msg) +nats_msg = NATS.subscription.next() +env = smartreceive(String(nats_msg.payload)) # NATSBridge automatically: # 1. Extracts URL from payload @@ -428,8 +430,8 @@ arrow_bytes = buf.getvalue() ```julia # Julia backend -msg = NATS.subscription.next() -env = smartreceive(msg) +nats_msg = NATS.subscription.next() +env = smartreceive(String(nats_msg.payload)) # env["payloads"][1] is now: # ("data", DataFrame with id, name, score columns, "arrowtable") @@ -512,8 +514,8 @@ payload_b64 = base64.b64encode(json_bytes).decode('ascii') ```python # Python backend -msg = await nats_consumer.next() -env = await smartreceive(msg) +nats_msg = await nats_consumer.next() +env = await smartreceive(str(nats_msg.payload)) # env["payloads"][0] is now: # ("data", {"temperature": 25.5, "humidity": 60.0, ...}, "dictionary") @@ -561,8 +563,8 @@ const [env, msgJson] = await NATSBridge.smartsend( ```python # Python (Backend) -msg = await nats_consumer.next() -env = await smartreceive(msg) +nats_msg = await nats_consumer.next() +env = await smartreceive(str(nats_msg.payload)) # env["payloads"] is now: # [ @@ -580,8 +582,8 @@ env = await smartreceive(msg) ```julia # Julia (Backend) -msg = NATS.subscription.next() -env = smartreceive(msg) +nats_msg = NATS.subscription.next() +env = smartreceive(String(nats_msg.payload)) # env["payloads"] is now: # [ @@ -726,7 +728,7 @@ log_trace(correlation_id, "Published to NATS") |----------|---------|-------------| | `NATS_URL` | `nats://localhost:4222` | NATS server URL | | `FILESERVER_URL` | `http://localhost:8080` | HTTP file server URL | -| `SIZE_THRESHOLD` | `1000000` | Size threshold in bytes | +| `SIZE_THRESHOLD` | `500000` | Size threshold in bytes (0.5MB) | --- @@ -769,6 +771,10 @@ log_trace(correlation_id, "Published to NATS") | Date | Version | Changes | Specification Reference | |------|---------|---------|------------------------| +| 2026-05-13 | 1.2.0 | Aligned with ground truth implementation (src/NATSBridge.jl) | All sections | +| - | - | Updated smartreceive calls to use String(nats_msg.payload) pattern | All sections | +| - | - | Removed NATSClient.publish() calls (caller responsible for NATS publishing) | All sections | +| - | - | Removed is_publish and nats_connection parameter references | All sections | | 2026-03-23 | 1.0.0 | Updated to ASG Framework walkthrough guidelines | All sections | | 2026-03-13 | 1.0.0 | Initial walkthrough documentation | specification.md:2-19 (all sections) |