update
This commit is contained in:
@@ -103,9 +103,9 @@ smartsend(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Receive returns a dictionary envelope with all metadata and deserialized payloads
|
# Receive returns a dictionary envelope with all metadata and deserialized payloads
|
||||||
envelope = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay)
|
env = smartreceive(msg, fileserverDownloadHandler, max_retries, base_delay, max_delay)
|
||||||
# envelope["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
|
# env["payloads"] = [("dataname1", data1, type1), ("dataname2", data2, type2), ...]
|
||||||
# envelope["correlationId"], envelope["msgId"], etc.
|
# env["correlationId"], env["msgId"], etc.
|
||||||
```
|
```
|
||||||
|
|
||||||
## Architecture Diagram
|
## Architecture Diagram
|
||||||
|
|||||||
@@ -59,9 +59,9 @@ smartsend("/test", [(dataname1, data1, "text")], ...)
|
|||||||
smartsend("/test", [(dataname1, data1, "dictionary"), (dataname2, data2, "table")], ...)
|
smartsend("/test", [(dataname1, data1, "dictionary"), (dataname2, data2, "table")], ...)
|
||||||
|
|
||||||
# Receive returns a dictionary envelope with all metadata and deserialized payloads
|
# Receive returns a dictionary envelope with all metadata and deserialized payloads
|
||||||
envelope = smartreceive(msg, ...)
|
env = smartreceive(msg, ...)
|
||||||
# envelope["payloads"] = [(dataname1, data1, "text"), (dataname2, data2, "table"), ...]
|
# env["payloads"] = [(dataname1, data1, "text"), (dataname2, data2, "table"), ...]
|
||||||
# envelope["correlationId"], envelope["msgId"], etc.
|
# env["correlationId"], env["msgId"], etc.
|
||||||
```
|
```
|
||||||
|
|
||||||
## Cross-Platform Interoperability
|
## Cross-Platform Interoperability
|
||||||
@@ -104,8 +104,8 @@ smartsend("/cross_platform", data, nats_url="nats://localhost:4222")
|
|||||||
```javascript
|
```javascript
|
||||||
// JavaScript receiver
|
// JavaScript receiver
|
||||||
const { smartreceive } = require('./src/NATSBridge');
|
const { smartreceive } = require('./src/NATSBridge');
|
||||||
const envelope = await smartreceive(msg);
|
const env = await smartreceive(msg);
|
||||||
// envelope.payloads[0].data === "Hello from Julia!"
|
// env.payloads[0].data === "Hello from Julia!"
|
||||||
```
|
```
|
||||||
|
|
||||||
```python
|
```python
|
||||||
@@ -330,18 +330,18 @@ const nc = await connect({ servers: ['nats://localhost:4222'] });
|
|||||||
const sub = nc.subscribe("control");
|
const sub = nc.subscribe("control");
|
||||||
|
|
||||||
for await (const msg of sub) {
|
for await (const msg of sub) {
|
||||||
const envelope = await smartreceive(msg);
|
const env = await smartreceive(msg);
|
||||||
|
|
||||||
// Process the payloads from the envelope
|
// Process the payloads from the envelope
|
||||||
for (const payload of envelope.payloads) {
|
for (const payload of env.payloads) {
|
||||||
const { dataname, data, type } = payload;
|
const { dataname, data, type } = payload;
|
||||||
console.log(`Received ${dataname} of type ${type}`);
|
console.log(`Received ${dataname} of type ${type}`);
|
||||||
console.log(`Data: ${JSON.stringify(data)}`);
|
console.log(`Data: ${JSON.stringify(data)}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Also access envelope metadata
|
// Also access envelope metadata
|
||||||
console.log(`Correlation ID: ${envelope.correlationId}`);
|
console.log(`Correlation ID: ${env.correlationId}`);
|
||||||
console.log(`Message ID: ${envelope.msgId}`);
|
console.log(`Message ID: ${env.msgId}`);
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -369,11 +369,11 @@ env, env_json_str = SmartSend("analysis_results", [("table_data", df, "table")])
|
|||||||
```javascript
|
```javascript
|
||||||
const { smartreceive } = require('./src/NATSBridge');
|
const { smartreceive } = require('./src/NATSBridge');
|
||||||
|
|
||||||
const envelope = await smartreceive(msg);
|
const env = await smartreceive(msg);
|
||||||
|
|
||||||
// Use table data from the payloads field
|
// Use table data from the payloads field
|
||||||
// Note: Tables are sent as arrays of objects in JavaScript
|
// Note: Tables are sent as arrays of objects in JavaScript
|
||||||
const table = envelope.payloads;
|
const table = env.payloads;
|
||||||
```
|
```
|
||||||
|
|
||||||
### Scenario 3: Live Binary Processing
|
### Scenario 3: Live Binary Processing
|
||||||
@@ -423,10 +423,10 @@ from nats_bridge import smartreceive
|
|||||||
|
|
||||||
# Receive binary data
|
# Receive binary data
|
||||||
def process_binary(msg):
|
def process_binary(msg):
|
||||||
envelope = smartreceive(msg)
|
env = smartreceive(msg)
|
||||||
|
|
||||||
# Process the binary data from envelope.payloads
|
# Process the binary data from env.payloads
|
||||||
for dataname, data, type in envelope["payloads"]:
|
for dataname, data, type in env["payloads"]:
|
||||||
if type == "binary":
|
if type == "binary":
|
||||||
# data is bytes
|
# data is bytes
|
||||||
print(f"Received binary data: {dataname}, size: {len(data)}")
|
print(f"Received binary data: {dataname}, size: {len(data)}")
|
||||||
@@ -439,10 +439,10 @@ const { smartreceive } = require('./src/NATSBridge');
|
|||||||
|
|
||||||
// Receive binary data
|
// Receive binary data
|
||||||
function process_binary(msg) {
|
function process_binary(msg) {
|
||||||
const envelope = await smartreceive(msg);
|
const env = await smartreceive(msg);
|
||||||
|
|
||||||
// Process the binary data from envelope.payloads
|
// Process the binary data from env.payloads
|
||||||
for (const payload of envelope.payloads) {
|
for (const payload of env.payloads) {
|
||||||
if (payload.type === "binary") {
|
if (payload.type === "binary") {
|
||||||
// data is an ArrayBuffer or Uint8Array
|
// data is an ArrayBuffer or Uint8Array
|
||||||
console.log(`Received binary data: ${payload.dataname}, size: ${payload.data.length}`);
|
console.log(`Received binary data: ${payload.dataname}, size: ${payload.data.length}`);
|
||||||
@@ -483,8 +483,8 @@ const consumer = await js.pullSubscribe("health", {
|
|||||||
|
|
||||||
// Process historical and real-time messages
|
// Process historical and real-time messages
|
||||||
for await (const msg of consumer) {
|
for await (const msg of consumer) {
|
||||||
const envelope = await smartreceive(msg);
|
const env = await smartreceive(msg);
|
||||||
// envelope.payloads contains the list of payloads
|
// env.payloads contains the list of payloads
|
||||||
// Each payload has: dataname, data, type
|
// Each payload has: dataname, data, type
|
||||||
msg.ack();
|
msg.ack();
|
||||||
}
|
}
|
||||||
@@ -501,10 +501,10 @@ import json
|
|||||||
|
|
||||||
# Device configuration handler
|
# Device configuration handler
|
||||||
def handle_device_config(msg):
|
def handle_device_config(msg):
|
||||||
envelope = smartreceive(msg)
|
env = smartreceive(msg)
|
||||||
|
|
||||||
# Process configuration from payloads
|
# Process configuration from payloads
|
||||||
for dataname, data, type in envelope["payloads"]:
|
for dataname, data, type in env["payloads"]:
|
||||||
if type == "dictionary":
|
if type == "dictionary":
|
||||||
print(f"Received configuration: {data}")
|
print(f"Received configuration: {data}")
|
||||||
# Apply configuration to device
|
# Apply configuration to device
|
||||||
@@ -523,7 +523,7 @@ def handle_device_config(msg):
|
|||||||
"device/response",
|
"device/response",
|
||||||
[("config", config, "dictionary")],
|
[("config", config, "dictionary")],
|
||||||
nats_url="nats://localhost:4222",
|
nats_url="nats://localhost:4222",
|
||||||
reply_to=envelope.get("replyTo")
|
reply_to=env.get("replyTo")
|
||||||
)
|
)
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -583,11 +583,11 @@ smartsend(
|
|||||||
const { smartreceive, smartsend } = require('./src/NATSBridge');
|
const { smartreceive, smartsend } = require('./src/NATSBridge');
|
||||||
|
|
||||||
// Receive NATS message with direct transport
|
// Receive NATS message with direct transport
|
||||||
const envelope = await smartreceive(msg);
|
const env = await smartreceive(msg);
|
||||||
|
|
||||||
// Decode Base64 payload (for direct transport)
|
// Decode Base64 payload (for direct transport)
|
||||||
// For tables, data is in envelope.payloads
|
// For tables, data is in env.payloads
|
||||||
const table = envelope.payloads; // Array of objects
|
const table = env.payloads; // Array of objects
|
||||||
|
|
||||||
// User makes selection
|
// User makes selection
|
||||||
const selection = uiComponent.getSelectedOption();
|
const selection = uiComponent.getSelectedOption();
|
||||||
|
|||||||
@@ -159,8 +159,8 @@ println("Message sent!")
|
|||||||
from nats_bridge import smartreceive
|
from nats_bridge import smartreceive
|
||||||
|
|
||||||
# Receive and process message
|
# Receive and process message
|
||||||
envelope = smartreceive(msg)
|
env = smartreceive(msg)
|
||||||
for dataname, data, type in envelope["payloads"]:
|
for dataname, data, type in env["payloads"]:
|
||||||
print(f"Received {dataname}: {data}")
|
print(f"Received {dataname}: {data}")
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -170,8 +170,8 @@ for dataname, data, type in envelope["payloads"]:
|
|||||||
const { smartreceive } = require('./src/NATSBridge');
|
const { smartreceive } = require('./src/NATSBridge');
|
||||||
|
|
||||||
// Receive and process message
|
// Receive and process message
|
||||||
const envelope = await smartreceive(msg);
|
const env = await smartreceive(msg);
|
||||||
for (const payload of envelope.payloads) {
|
for (const payload of env.payloads) {
|
||||||
console.log(`Received ${payload.dataname}: ${payload.data}`);
|
console.log(`Received ${payload.dataname}: ${payload.data}`);
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@@ -182,8 +182,8 @@ for (const payload of envelope.payloads) {
|
|||||||
using NATSBridge
|
using NATSBridge
|
||||||
|
|
||||||
# Receive and process message
|
# Receive and process message
|
||||||
envelope = smartreceive(msg, fileserverDownloadHandler)
|
env = smartreceive(msg, fileserverDownloadHandler)
|
||||||
for (dataname, data, type) in envelope["payloads"]
|
for (dataname, data, type) in env["payloads"]
|
||||||
println("Received $dataname: $data")
|
println("Received $dataname: $data")
|
||||||
end
|
end
|
||||||
```
|
```
|
||||||
@@ -313,10 +313,10 @@ const { smartreceive, smartsend } = require('./src/NATSBridge');
|
|||||||
const sub = nc.subscribe("/device/command");
|
const sub = nc.subscribe("/device/command");
|
||||||
|
|
||||||
for await (const msg of sub) {
|
for await (const msg of sub) {
|
||||||
const envelope = await smartreceive(msg);
|
const env = await smartreceive(msg);
|
||||||
|
|
||||||
// Process command
|
// Process command
|
||||||
for (const payload of envelope.payloads) {
|
for (const payload of env.payloads) {
|
||||||
if (payload.dataname === "command") {
|
if (payload.dataname === "command") {
|
||||||
const command = payload.data;
|
const command = payload.data;
|
||||||
|
|
||||||
@@ -331,8 +331,8 @@ for await (const msg of sub) {
|
|||||||
await smartsend("/device/response", [
|
await smartsend("/device/response", [
|
||||||
{ dataname: "sensor_data", data: response, type: "dictionary" }
|
{ dataname: "sensor_data", data: response, type: "dictionary" }
|
||||||
], {
|
], {
|
||||||
reply_to: envelope.replyTo,
|
reply_to: env.replyTo,
|
||||||
reply_to_msg_id: envelope.msgId
|
reply_to_msg_id: env.msgId
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -528,8 +528,8 @@ env, env_json_str = smartsend("/analysis/config", data, nats_url="nats://localho
|
|||||||
const { smartreceive } = require('./src/NATSBridge');
|
const { smartreceive } = require('./src/NATSBridge');
|
||||||
|
|
||||||
// Receive dictionary from Julia
|
// Receive dictionary from Julia
|
||||||
const envelope = await smartreceive(msg);
|
const env = await smartreceive(msg);
|
||||||
for (const payload of envelope.payloads) {
|
for (const payload of env.payloads) {
|
||||||
if (payload.type === "dictionary") {
|
if (payload.type === "dictionary") {
|
||||||
console.log("Received config:", payload.data);
|
console.log("Received config:", payload.data);
|
||||||
// payload.data = { step_size: 0.01, iterations: 1000 }
|
// payload.data = { step_size: 0.01, iterations: 1000 }
|
||||||
@@ -554,8 +554,8 @@ const { env, env_json_str } = await smartsend("/data/transfer", [
|
|||||||
```python
|
```python
|
||||||
from nats_bridge import smartreceive
|
from nats_bridge import smartreceive
|
||||||
|
|
||||||
envelope = smartreceive(msg)
|
env = smartreceive(msg)
|
||||||
for dataname, data, type in envelope["payloads"]:
|
for dataname, data, type in env["payloads"]:
|
||||||
if type == "text":
|
if type == "text":
|
||||||
print(f"Received from JS: {data}")
|
print(f"Received from JS: {data}")
|
||||||
```
|
```
|
||||||
@@ -576,8 +576,8 @@ env, env_json_str = smartsend("/chat/python", data)
|
|||||||
```julia
|
```julia
|
||||||
using NATSBridge
|
using NATSBridge
|
||||||
|
|
||||||
envelope = smartreceive(msg, fileserverDownloadHandler)
|
env = smartreceive(msg, fileserverDownloadHandler)
|
||||||
for (dataname, data, type) in envelope["payloads"]
|
for (dataname, data, type) in env["payloads"]
|
||||||
if type == "text"
|
if type == "text"
|
||||||
println("Received from Python: $data")
|
println("Received from Python: $data")
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -216,15 +216,15 @@ class ChatHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async handleMessage(msg) {
|
async handleMessage(msg) {
|
||||||
const envelope = await smartreceive(msg, {
|
const env = await smartreceive(msg, {
|
||||||
fileserverDownloadHandler: this.downloadFile.bind(this)
|
fileserverDownloadHandler: this.downloadFile.bind(this)
|
||||||
});
|
});
|
||||||
|
|
||||||
// Extract sender info from envelope
|
// Extract sender info from envelope
|
||||||
const sender = envelope.senderName || 'Anonymous';
|
const sender = env.senderName || 'Anonymous';
|
||||||
|
|
||||||
// Process each payload
|
// Process each payload
|
||||||
for (const payload of envelope.payloads) {
|
for (const payload of env.payloads) {
|
||||||
if (payload.type === 'text') {
|
if (payload.type === 'text') {
|
||||||
this.ui.addMessage(sender, payload.data);
|
this.ui.addMessage(sender, payload.data);
|
||||||
} else if (payload.type === 'image') {
|
} else if (payload.type === 'image') {
|
||||||
@@ -356,12 +356,12 @@ class FileDownloadService {
|
|||||||
|
|
||||||
async downloadFile(sender, downloadId) {
|
async downloadFile(sender, downloadId) {
|
||||||
// Subscribe to sender's file channel
|
// Subscribe to sender's file channel
|
||||||
const envelope = await smartreceive(msg, {
|
const env = await smartreceive(msg, {
|
||||||
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
|
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
|
||||||
});
|
});
|
||||||
|
|
||||||
// Process each payload
|
// Process each payload
|
||||||
for (const payload of envelope.payloads) {
|
for (const payload of env.payloads) {
|
||||||
if (payload.type === 'binary') {
|
if (payload.type === 'binary') {
|
||||||
const filePath = `/downloads/${payload.dataname}`;
|
const filePath = `/downloads/${payload.dataname}`;
|
||||||
fs.writeFileSync(filePath, payload.data);
|
fs.writeFileSync(filePath, payload.data);
|
||||||
@@ -422,9 +422,9 @@ async function uploadFile(config) {
|
|||||||
const fileService = new FileUploadService(config.nats_url, config.fileserver_url);
|
const fileService = new FileUploadService(config.nats_url, config.fileserver_url);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const envelope = await fileService.uploadFile(filePath, recipient);
|
const env = await fileService.uploadFile(filePath, recipient);
|
||||||
console.log('Upload successful!');
|
console.log('Upload successful!');
|
||||||
console.log(`File ID: ${envelope.payloads[0].id}`);
|
console.log(`File ID: ${env.payloads[0].id}`);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Upload failed:', error.message);
|
console.error('Upload failed:', error.message);
|
||||||
}
|
}
|
||||||
@@ -533,7 +533,7 @@ class SensorSender:
|
|||||||
|
|
||||||
data = [("reading", reading.to_dict(), "dictionary")]
|
data = [("reading", reading.to_dict(), "dictionary")]
|
||||||
|
|
||||||
# With is_publish=False, returns (envelope, json_str) without publishing
|
# With is_publish=False, returns (env, env_json_str) without publishing
|
||||||
env, env_json_str = smartsend(
|
env, env_json_str = smartsend(
|
||||||
f"/sensors/{sensor_id}/prepare",
|
f"/sensors/{sensor_id}/prepare",
|
||||||
data,
|
data,
|
||||||
@@ -597,9 +597,9 @@ class SensorReceiver:
|
|||||||
self.fileserver_download_handler = fileserver_download_handler
|
self.fileserver_download_handler = fileserver_download_handler
|
||||||
|
|
||||||
def process_reading(self, msg):
|
def process_reading(self, msg):
|
||||||
envelope = smartreceive(msg, self.fileserver_download_handler)
|
env = smartreceive(msg, self.fileserver_download_handler)
|
||||||
|
|
||||||
for dataname, data, data_type in envelope["payloads"]:
|
for dataname, data, data_type in env["payloads"]:
|
||||||
if data_type == "dictionary":
|
if data_type == "dictionary":
|
||||||
reading = SensorReading(
|
reading = SensorReading(
|
||||||
sensor_id=data["sensor_id"],
|
sensor_id=data["sensor_id"],
|
||||||
@@ -699,10 +699,10 @@ class DeviceBridge:
|
|||||||
# Poll for messages
|
# Poll for messages
|
||||||
msg = self._poll_for_message()
|
msg = self._poll_for_message()
|
||||||
if msg:
|
if msg:
|
||||||
envelope = smartreceive(msg)
|
env = smartreceive(msg)
|
||||||
|
|
||||||
# Process payloads
|
# Process payloads
|
||||||
for dataname, data, data_type in envelope["payloads"]:
|
for dataname, data, data_type in env["payloads"]:
|
||||||
if dataname == "command":
|
if dataname == "command":
|
||||||
callback(data)
|
callback(data)
|
||||||
|
|
||||||
@@ -798,9 +798,9 @@ class DashboardServer:
|
|||||||
|
|
||||||
def receive_selection(self, callback):
|
def receive_selection(self, callback):
|
||||||
def handler(msg):
|
def handler(msg):
|
||||||
envelope = smartreceive(msg)
|
env = smartreceive(msg)
|
||||||
|
|
||||||
for dataname, data, data_type in envelope["payloads"]:
|
for dataname, data, data_type in env["payloads"]:
|
||||||
if data_type == "dictionary":
|
if data_type == "dictionary":
|
||||||
callback(data)
|
callback(data)
|
||||||
|
|
||||||
@@ -842,12 +842,12 @@ class DashboardUI {
|
|||||||
|
|
||||||
async fetchData() {
|
async fetchData() {
|
||||||
// Subscribe to data updates
|
// Subscribe to data updates
|
||||||
const envelope = await smartreceive(msg, {
|
const env = await smartreceive(msg, {
|
||||||
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
|
fileserverDownloadHandler: this.fetchFromUrl.bind(this)
|
||||||
});
|
});
|
||||||
|
|
||||||
// Process table data
|
// Process table data
|
||||||
for (const payload of envelope.payloads) {
|
for (const payload of env.payloads) {
|
||||||
if (payload.type === 'table') {
|
if (payload.type === 'table') {
|
||||||
// Deserialize Arrow IPC
|
// Deserialize Arrow IPC
|
||||||
this.data = this.deserializeArrow(payload.data);
|
this.data = this.deserializeArrow(payload.data);
|
||||||
|
|||||||
Reference in New Issue
Block a user