diff --git a/README.md b/README.md index 9368d49..bef2f2d 100644 --- a/README.md +++ b/README.md @@ -206,36 +206,95 @@ println("Message sent!") #### Python/Micropython ```python +import nats +import asyncio from nats_bridge import smartreceive -# Receive and process message -envelope = smartreceive(msg) -for dataname, data, type in envelope["payloads"]: - print(f"Received {dataname}: {data}") +# Configuration +SUBJECT = "/chat/room1" +NATS_URL = "nats://localhost:4222" + +async def main(): + # Connect to NATS + nc = await nats.connect(NATS_URL) + + # Subscribe to the subject - msg comes from the callback + async def message_handler(msg): + # Receive and process message + envelope = smartreceive(msg.data) + for dataname, data, type in envelope["payloads"]: + print(f"Received {dataname}: {data}") + + sid = await nc.subscribe(SUBJECT, cb=message_handler) + await asyncio.sleep(120) # Keep listening + await nc.close() + +asyncio.run(main()) ``` #### JavaScript ```javascript const { smartreceive } = require('./src/NATSBridge'); +const { connect } = require('nats'); -// Receive and process message -const envelope = await smartreceive(msg); -for (const payload of envelope.payloads) { - console.log(`Received ${payload.dataname}: ${payload.data}`); +// Configuration +const SUBJECT = "/chat/room1"; +const NATS_URL = "nats://localhost:4222"; + +async function main() { + // Connect to NATS + const nc = await connect({ servers: [NATS_URL] }); + + // Subscribe to the subject - msg comes from the async iteration + const sub = nc.subscribe(SUBJECT); + + for await (const msg of sub) { + // Receive and process message + const envelope = await smartreceive(msg); + for (const payload of envelope.payloads) { + console.log(`Received ${payload.dataname}: ${payload.data}`); + } + } } + +main(); ``` #### Julia ```julia -using NATSBridge +using NATS, NATSBridge -# Receive and process message -envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler) -for (dataname, data, type) in envelope["payloads"] - println("Received $dataname: $data") +# Configuration +const SUBJECT = "/chat/room1" +const NATS_URL = "nats://localhost:4222" + +# Helper: Log with correlation ID +function log_trace(message) + timestamp = Dates.now() + println("[$timestamp] $message") end + +# Receiver: Listen for messages - msg comes from the callback +function test_receive() + conn = NATS.connect(NATS_URL) + NATS.subscribe(conn, SUBJECT) do msg + log_trace("Received message on $(msg.subject)") + + # Receive and process message + envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler) + for (dataname, data, type) in envelope["payloads"] + println("Received $dataname: $data") + end + end + + # Keep listening for 120 seconds + sleep(120) + NATS.drain(conn) +end + +test_receive() ``` --- @@ -323,8 +382,9 @@ Receives and processes messages from NATS, handling both direct and link transpo ```python from nats_bridge import smartreceive +# Note: For nats-py, use msg.data to pass the raw message data envelope = smartreceive( - msg, # NATS message + msg.data, # NATS message data (msg.data for nats-py) fileserver_download_handler=_fetch_with_backoff, # Download handler max_retries=5, # Max retry attempts base_delay=100, # Initial delay in ms @@ -338,8 +398,9 @@ envelope = smartreceive( ```javascript const { smartreceive } = require('./src/NATSBridge'); +// Note: msg is the NATS message object from subscription const envelope = await smartreceive( - msg, // NATS message + msg, // NATS message (raw object from subscription) { fileserverDownloadHandler: customDownloadHandler, maxRetries: 5, @@ -355,6 +416,7 @@ const envelope = await smartreceive( ```julia using NATSBridge +# Note: msg is a NATS.Msg object passed from the subscription callback envelope = NATSBridge.smartreceive( msg::NATS.Msg; fileserverDownloadHandler::Function = _fetch_with_backoff, @@ -591,13 +653,30 @@ env = smartsend( #### Python/Micropython (Responder) ```python -from nats_bridge import smartreceive +import nats +import asyncio +from nats_bridge import smartreceive, smartsend -envelope = smartreceive(msg) -for dataname, data, type in envelope["payloads"]: - if data.get("action") == "read_sensor": - response = {"sensor_id": "sensor-001", "value": 42.5} - smartsend("/device/response", [("data", response, "dictionary")]) +# Configuration +SUBJECT = "/device/command" +REPLY_SUBJECT = "/device/response" +NATS_URL = "nats://localhost:4222" + +async def main(): + nc = await nats.connect(NATS_URL) + + async def message_handler(msg): + envelope = smartreceive(msg.data) + for dataname, data, type in envelope["payloads"]: + if data.get("action") == "read_sensor": + response = {"sensor_id": "sensor-001", "value": 42.5} + smartsend(REPLY_SUBJECT, [("data", response, "dictionary")]) + + sid = await nc.subscribe(SUBJECT, cb=message_handler) + await asyncio.sleep(120) + await nc.close() + +asyncio.run(main()) ``` #### JavaScript (Requester) @@ -614,16 +693,32 @@ await smartsend("/device/command", [ #### JavaScript (Responder) ```javascript const { smartreceive, smartsend } = require('./src/NATSBridge'); +const { connect } = require('nats'); -const envelope = await smartreceive(msg); -for (const payload of envelope.payloads) { - if (payload.dataname === "command" && payload.data.action === "read_sensor") { - const response = { sensor_id: "sensor-001", value: 42.5 }; - await smartsend("/device/response", [ - { dataname: "data", data: response, type: "dictionary" } - ]); +// Configuration +const SUBJECT = "/device/command"; +const REPLY_SUBJECT = "/device/response"; +const NATS_URL = "nats://localhost:4222"; + +async function main() { + const nc = await connect({ servers: [NATS_URL] }); + + const sub = nc.subscribe(SUBJECT); + + for await (const msg of sub) { + const envelope = await smartreceive(msg); + for (const payload of envelope.payloads) { + if (payload.dataname === "command" && payload.data.action === "read_sensor") { + const response = { sensor_id: "sensor-001", value: 42.5 }; + await smartsend(REPLY_SUBJECT, [ + { dataname: "data", data: response, type: "dictionary" } + ]); + } + } } } + +main(); ``` #### Julia (Requester) @@ -639,15 +734,30 @@ env = NATSBridge.smartsend( #### Julia (Responder) ```julia -using NATSBridge +using NATS, NATSBridge -envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler) -for (dataname, data, type) in envelope["payloads"] - if dataname == "command" && data["action"] == "read_sensor" - response = Dict("sensor_id" => "sensor-001", "value" => 42.5) - smartsend("/device/response", [("data", response, "dictionary")]) +# Configuration +const SUBJECT = "/device/command" +const REPLY_SUBJECT = "/device/response" +const NATS_URL = "nats://localhost:4222" + +function test_responder() + conn = NATS.connect(NATS_URL) + NATS.subscribe(conn, SUBJECT) do msg + envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler) + for (dataname, data, type) in envelope["payloads"] + if dataname == "command" && data["action"] == "read_sensor" + response = Dict("sensor_id" => "sensor-001", "value" => 42.5) + smartsend(REPLY_SUBJECT, [("data", response, "dictionary")]) + end + end end + + sleep(120) + NATS.drain(conn) end + +test_responder() ``` ### Example 5: Micropython IoT Device @@ -656,47 +766,91 @@ Lightweight Micropython device sending sensor data. #### Micropython ```python +import nats +import asyncio from nats_bridge import smartsend, smartreceive -import json -# Send sensor data -data = [("temperature", "25.5", "text"), ("humidity", 65, "dictionary")] -smartsend("/device/sensors", data, nats_url="nats://localhost:4222") +# Configuration +SUBJECT = "/device/sensors" +NATS_URL = "nats://localhost:4222" -# Receive commands -envelope = smartreceive(msg) -for dataname, data, type in envelope["payloads"]: - if type == "dictionary" and data.get("action") == "reboot": - # Execute reboot - pass +async def main(): + nc = await nats.connect(NATS_URL) + + # Send sensor data + data = [("temperature", "25.5", "text"), ("humidity", 65, "dictionary")] + smartsend("/device/sensors", data, nats_url="nats://localhost:4222") + + # Receive commands - msg comes from the callback + async def message_handler(msg): + envelope = smartreceive(msg.data) + for dataname, data, type in envelope["payloads"]: + if type == "dictionary" and data.get("action") == "reboot": + # Execute reboot + pass + + sid = await nc.subscribe(SUBJECT, cb=message_handler) + await asyncio.sleep(120) + await nc.close() + +asyncio.run(main()) ``` #### JavaScript (Receiver) ```javascript const { smartreceive } = require('./src/NATSBridge'); +const { connect } = require('nats'); -const envelope = await smartreceive(msg); -for (const payload of envelope.payloads) { - if (payload.dataname === "temperature") { - console.log(`Temperature: ${payload.data}`); - } else if (payload.dataname === "humidity") { - console.log(`Humidity: ${payload.data}`); +// Configuration +const SUBJECT = "/device/sensors"; +const NATS_URL = "nats://localhost:4222"; + +async function main() { + const nc = await connect({ servers: [NATS_URL] }); + + const sub = nc.subscribe(SUBJECT); + + for await (const msg of sub) { + const envelope = await smartreceive(msg); + for (const payload of envelope.payloads) { + if (payload.dataname === "temperature") { + console.log(`Temperature: ${payload.data}`); + } else if (payload.dataname === "humidity") { + console.log(`Humidity: ${payload.data}`); + } + } } } + +main(); ``` #### Julia (Receiver) ```julia -using NATSBridge +using NATS, NATSBridge -envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler) -for (dataname, data, type) in envelope["payloads"] - if dataname == "temperature" - println("Temperature: $data") - elseif dataname == "humidity" - println("Humidity: $data") +# Configuration +const SUBJECT = "/device/sensors" +const NATS_URL = "nats://localhost:4222" + +function test_receiver() + conn = NATS.connect(NATS_URL) + NATS.subscribe(conn, SUBJECT) do msg + envelope = NATSBridge.smartreceive(msg, fileserverDownloadHandler) + for (dataname, data, type) in envelope["payloads"] + if dataname == "temperature" + println("Temperature: $data") + elseif dataname == "humidity" + println("Humidity: $data") + end + end end + + sleep(120) + NATS.drain(conn) end + +test_receiver() ``` ---