1st commit

This commit is contained in:
2026-02-10 06:55:29 +07:00
commit e207f1ec2f
17 changed files with 3174 additions and 0 deletions

14
AI_prompt.txt Normal file
View File

@@ -0,0 +1,14 @@
Consider the following scenarios:
Scenario 1: The "Command & Control" Loop (Low Latency)Focus: Small payloads, Core NATS, bi-directional JSON.The Action: A user on a JavaScript dashboard clicks a "Start Simulation" button. This sends a JSON configuration (parameters like step_size and iterations) to Julia.The Flow: * JS (Sender): Recognizes the message is small ($< 10KB$). Packages it as a direct transport JSON envelope.Julia (Receiver): Listens on the NATS subject, decodes the JSON, and immediately acknowledges receipt with a "Running" status.Project Requirement Met: Fast, low-overhead communication for control signals without involving the fileserver.
Scenario 2: The "Deep Dive" Analysis (High Bandwidth)Focus: Large Arrow tables, Claim-Check pattern, Julia to JS.The Action: Julia finishes a heavy computation and produces a 500MB DataFrame with 10 million rows. It needs to send this to the JS frontend for visualization (e.g., using Perspective.js or D3).The Flow:Julia (Sender): Converts the DataFrame to an Arrow IPC stream. It sees the size is $> 1MB$, so it uploads the bytes to the HTTP fileserver. It then publishes a NATS message with transport: "link" and the URL.JS (Receiver): Receives the URL, fetches the data via fetch(), and uses tableFromIPC() to load the data into memory with zero-copy.Project Requirement Met: Handling massive datasets that exceed NATS message limits while maintaining data integrity across languages.
Scenario 3: Live Audio/Signal Processing (Multimedia & Metadata)Focus: Raw binary, bi-directional streaming, NATS Headers.The Action: The JS client captures a 2-second "chunk" of microphone audio. It needs Julia to perform a Fast Fourier Transform (FFT) or AI transcription.The Flow:JS (Sender): Sends the raw binary WAV/PCM data. It uses NATS Headers to store the metadata ($fs = 44.1kHz$, $channels = 1$) to keep the payload purely binary.Julia (Receiver): Processes the audio and sends back a JSON result (the transcription) and an Arrow Table (the frequency spectrum data).Project Requirement Met: Bi-directional flow involving mixed media (Audio) and technical results (Arrow).
Scenario 4: The "Catch-Up" (Persistence & JetStream)Focus: NATS JetStream, late-joining consumers, state sync.The Action: Julia is constantly publishing "System Health" updates. The JS dashboard is closed for 10 minutes. When the user re-opens the dashboard, they need to see the last 10 minutes of history.The Flow:NATS (Server): Uses a JetStream with a Limits retention policy.JS (Consumer): Connects and requests a "Replay" from the last 10 minutes. It receives a mix of direct (small updates) and link (historical snapshots) messages.Project Requirement Met: Temporal decoupling—consumers can receive data that was sent while they were offline.
Role: Principal Systems Architect & Lead Software Engineer.Objective: Implement a high-performance, bi-directional data bridge between a Julia service and a JavaScript (Node.js) service using NATS (Core & JetStream).⚠️ STRICT ARCHITECTURAL CONSTRAINTS (Non-Negotiable)Transport Strategy (Claim-Check Pattern):Direct Path: If payload is < 1MB, send data directly via NATS inside the message envelope (Base64 encoded).Link Path: If payload is > 1MB, upload to a shared HTTP fileserver/store. The NATS message must only contain the metadata and the download URL.Tabular Data Format: * MUST use Apache Arrow IPC Stream for all tables/DataFrames. No CSV or standard JSON-serialization of tables allowed.System Symmetry: * Both services must function as Producers AND Consumers.Modular Elegance: * Implementation must be abstracted into a SmartSend function and a SmartReceive handler. The developer calling these functions should not need to care if the data is going via NATS direct or HTTP link.Technical Stack & Use CasesJulia: NATS.jl, Arrow.jl, JSON3.jl, HTTP.jl.Node.js: nats.js, apache-arrow.Scenarios to Support: * Large Data: Sending a 500MB Arrow table from Julia $\rightarrow$ JS.Media: Sending a 5MB WAV file from JS $\rightarrow$ Julia.Signals: Sending small JSON control commands ($< 10KB$) directly via NATS.Implementation Requirements1. Unified JSON Envelope:Define a schema containing: correlation_id (UUID), type (table/binary/json), transport (direct/link), payload (if direct), and url (if link).2. The Julia Module:Implement SmartSend(subject, data, type): Handles Arrow serialization to an IOBuffer, checks size, and manages HTTP uploads for large blobs.Implement SmartReceive(msg): Parses envelope, handles the HTTP fetch with Exponential Backoff (to avoid race conditions), and restores the DataFrame.Include a basic HTTP.listen server to serve as the temporary storage.3. The JavaScript Module:Implement a symmetric SmartSend using nats.js and apache-arrow.Implement a JetStream Pull Consumer for SmartReceive to ensure backpressure and memory safety.4. Performance & Reliability:Demonstrate "Zero-Copy" reading of the Arrow IPC stream on the JS side.Log the correlation_id at every stage for distributed tracing.

831
Manifest.toml Normal file
View File

@@ -0,0 +1,831 @@
# This file is machine-generated - editing it directly is not advised
julia_version = "1.12.4"
manifest_format = "2.0"
project_hash = "be1e3c2d8b7f4f0ee7375c94aaf704ce73ba57b9"
[[deps.AliasTables]]
deps = ["PtrArrays", "Random"]
git-tree-sha1 = "9876e1e164b144ca45e9e3198d0b689cadfed9ff"
uuid = "66dad0bd-aa9a-41b7-9441-69ab47430ed8"
version = "1.1.3"
[[deps.ArgTools]]
uuid = "0dad84c5-d112-42e6-8d28-ef12dabb789f"
version = "1.1.2"
[[deps.Arrow]]
deps = ["ArrowTypes", "BitIntegers", "CodecLz4", "CodecZstd", "ConcurrentUtilities", "DataAPI", "Dates", "EnumX", "Mmap", "PooledArrays", "SentinelArrays", "StringViews", "Tables", "TimeZones", "TranscodingStreams", "UUIDs"]
git-tree-sha1 = "4a69a3eadc1f7da78d950d1ef270c3a62c1f7e01"
uuid = "69666777-d1a9-59fb-9406-91d4454c9d45"
version = "2.8.1"
[[deps.ArrowTypes]]
deps = ["Sockets", "UUIDs"]
git-tree-sha1 = "404265cd8128a2515a81d5eae16de90fdef05101"
uuid = "31f734f8-188a-4ce0-8406-c8a06bd891cd"
version = "2.3.0"
[[deps.Artifacts]]
uuid = "56f22d72-fd6d-98f1-02f0-08ddc0907c33"
version = "1.11.0"
[[deps.Base64]]
uuid = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
version = "1.11.0"
[[deps.BitFlags]]
git-tree-sha1 = "0691e34b3bb8be9307330f88d1a3c3f25466c24d"
uuid = "d1d4a3ce-64b1-5f1a-9ba4-7e7e69966f35"
version = "0.1.9"
[[deps.BitIntegers]]
deps = ["Random"]
git-tree-sha1 = "091d591a060e43df1dd35faab3ca284925c48e46"
uuid = "c3b6d118-76ef-56ca-8cc7-ebb389d030a1"
version = "0.3.7"
[[deps.BufferedStreams]]
git-tree-sha1 = "6863c5b7fc997eadcabdbaf6c5f201dc30032643"
uuid = "e1450e63-4bb3-523b-b2a4-4ffa8c0fd77d"
version = "1.2.2"
[[deps.CSV]]
deps = ["CodecZlib", "Dates", "FilePathsBase", "InlineStrings", "Mmap", "Parsers", "PooledArrays", "PrecompileTools", "SentinelArrays", "Tables", "Unicode", "WeakRefStrings", "WorkerUtilities"]
git-tree-sha1 = "deddd8725e5e1cc49ee205a1964256043720a6c3"
uuid = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
version = "0.10.15"
[[deps.CodeTracking]]
deps = ["InteractiveUtils", "UUIDs"]
git-tree-sha1 = "b7231a755812695b8046e8471ddc34c8268cbad5"
uuid = "da1fd8a2-8d9e-5ec2-8556-3022fb5608a2"
version = "3.0.0"
[[deps.CodecBase]]
deps = ["TranscodingStreams"]
git-tree-sha1 = "40956acdbef3d8c7cc38cba42b56034af8f8581a"
uuid = "6c391c72-fb7b-5838-ba82-7cfb1bcfecbf"
version = "0.3.4"
[[deps.CodecLz4]]
deps = ["Lz4_jll", "TranscodingStreams"]
git-tree-sha1 = "d58afcd2833601636b48ee8cbeb2edcb086522c2"
uuid = "5ba52731-8f18-5e0d-9241-30f10d1ec561"
version = "0.4.6"
[[deps.CodecZlib]]
deps = ["TranscodingStreams", "Zlib_jll"]
git-tree-sha1 = "962834c22b66e32aa10f7611c08c8ca4e20749a9"
uuid = "944b1d66-785c-5afd-91f1-9de20f533193"
version = "0.7.8"
[[deps.CodecZstd]]
deps = ["TranscodingStreams", "Zstd_jll"]
git-tree-sha1 = "da54a6cd93c54950c15adf1d336cfd7d71f51a56"
uuid = "6b39b394-51ab-5f42-8807-6242bab2b4c2"
version = "0.8.7"
[[deps.Compat]]
deps = ["TOML", "UUIDs"]
git-tree-sha1 = "9d8a54ce4b17aa5bdce0ea5c34bc5e7c340d16ad"
uuid = "34da2185-b29b-5c13-b0c7-acf172513d20"
version = "4.18.1"
weakdeps = ["Dates", "LinearAlgebra"]
[deps.Compat.extensions]
CompatLinearAlgebraExt = "LinearAlgebra"
[[deps.Compiler]]
git-tree-sha1 = "382d79bfe72a406294faca39ef0c3cef6e6ce1f1"
uuid = "807dbc54-b67e-4c79-8afb-eafe4df6f2e1"
version = "0.1.1"
[[deps.CompilerSupportLibraries_jll]]
deps = ["Artifacts", "Libdl"]
uuid = "e66e0078-7015-5450-92f7-15fbd957f2ae"
version = "1.3.0+1"
[[deps.ConcurrentUtilities]]
deps = ["Serialization", "Sockets"]
git-tree-sha1 = "d9d26935a0bcffc87d2613ce14c527c99fc543fd"
uuid = "f0e56b4a-5159-44fe-b623-3e5288b988bb"
version = "2.5.0"
[[deps.Crayons]]
git-tree-sha1 = "249fe38abf76d48563e2f4556bebd215aa317e15"
uuid = "a8cc5b0e-0ffa-5ad4-8c14-923d3ee1735f"
version = "4.1.1"
[[deps.DataAPI]]
git-tree-sha1 = "abe83f3a2f1b857aac70ef8b269080af17764bbe"
uuid = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a"
version = "1.16.0"
[[deps.DataFrames]]
deps = ["Compat", "DataAPI", "DataStructures", "Future", "InlineStrings", "InvertedIndices", "IteratorInterfaceExtensions", "LinearAlgebra", "Markdown", "Missings", "PooledArrays", "PrecompileTools", "PrettyTables", "Printf", "Random", "Reexport", "SentinelArrays", "SortingAlgorithms", "Statistics", "TableTraits", "Tables", "Unicode"]
git-tree-sha1 = "d8928e9169ff76c6281f39a659f9bca3a573f24c"
uuid = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
version = "1.8.1"
[[deps.DataStructures]]
deps = ["OrderedCollections"]
git-tree-sha1 = "e357641bb3e0638d353c4b29ea0e40ea644066a6"
uuid = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
version = "0.19.3"
[[deps.DataValueInterfaces]]
git-tree-sha1 = "bfc1187b79289637fa0ef6d4436ebdfe6905cbd6"
uuid = "e2d170a0-9d28-54be-80f0-106bbe20a464"
version = "1.0.0"
[[deps.Dates]]
deps = ["Printf"]
uuid = "ade2ca70-3891-5945-98fb-dc099432e06a"
version = "1.11.0"
[[deps.Distributions]]
deps = ["AliasTables", "FillArrays", "LinearAlgebra", "PDMats", "Printf", "QuadGK", "Random", "SpecialFunctions", "Statistics", "StatsAPI", "StatsBase", "StatsFuns"]
git-tree-sha1 = "fbcc7610f6d8348428f722ecbe0e6cfe22e672c6"
uuid = "31c24e10-a181-5473-b8eb-7969acd0382f"
version = "0.25.123"
[deps.Distributions.extensions]
DistributionsChainRulesCoreExt = "ChainRulesCore"
DistributionsDensityInterfaceExt = "DensityInterface"
DistributionsTestExt = "Test"
[deps.Distributions.weakdeps]
ChainRulesCore = "d360d2e6-b24c-11e9-a2a3-2a2ae2dbcce4"
DensityInterface = "b429d917-457f-4dbc-8f4c-0cc954292b1d"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
[[deps.DocStringExtensions]]
git-tree-sha1 = "7442a5dfe1ebb773c29cc2962a8980f47221d76c"
uuid = "ffbed154-4ef7-542d-bbb7-c09d3a79fcae"
version = "0.9.5"
[[deps.Downloads]]
deps = ["ArgTools", "FileWatching", "LibCURL", "NetworkOptions"]
uuid = "f43a241f-c20a-4ad4-852c-f6b1247861c6"
version = "1.7.0"
[[deps.EnumX]]
git-tree-sha1 = "7bebc8aad6ee6217c78c5ddcf7ed289d65d0263e"
uuid = "4e289a0a-7415-4d19-859d-a7e5c4648b56"
version = "1.0.6"
[[deps.ExceptionUnwrapping]]
deps = ["Test"]
git-tree-sha1 = "d36f682e590a83d63d1c7dbd287573764682d12a"
uuid = "460bff9d-24e4-43bc-9d9f-a8973cb893f4"
version = "0.1.11"
[[deps.ExprTools]]
git-tree-sha1 = "27415f162e6028e81c72b82ef756bf321213b6ec"
uuid = "e2ba6199-217a-4e67-a87a-7c52f15ade04"
version = "0.1.10"
[[deps.FilePathsBase]]
deps = ["Compat", "Dates"]
git-tree-sha1 = "3bab2c5aa25e7840a4b065805c0cdfc01f3068d2"
uuid = "48062228-2e41-5def-b9a4-89aafe57970f"
version = "0.9.24"
weakdeps = ["Mmap", "Test"]
[deps.FilePathsBase.extensions]
FilePathsBaseMmapExt = "Mmap"
FilePathsBaseTestExt = "Test"
[[deps.FileWatching]]
uuid = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee"
version = "1.11.0"
[[deps.FillArrays]]
deps = ["LinearAlgebra"]
git-tree-sha1 = "2f979084d1e13948a3352cf64a25df6bd3b4dca3"
uuid = "1a297f60-69ca-5386-bcde-b61e274b549b"
version = "1.16.0"
[deps.FillArrays.extensions]
FillArraysPDMatsExt = "PDMats"
FillArraysSparseArraysExt = "SparseArrays"
FillArraysStaticArraysExt = "StaticArrays"
FillArraysStatisticsExt = "Statistics"
[deps.FillArrays.weakdeps]
PDMats = "90014a1f-27ba-587c-ab20-58faa44d9150"
SparseArrays = "2f01184e-e22b-5df5-ae63-d93ebab69eaf"
StaticArrays = "90137ffa-7385-5640-81b9-e52037218182"
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
[[deps.Future]]
deps = ["Random"]
uuid = "9fa8497b-333b-5362-9e8d-4d0656e87820"
version = "1.11.0"
[[deps.GeneralUtils]]
deps = ["CSV", "DataFrames", "DataStructures", "Dates", "Distributions", "JSON", "NATS", "PrettyPrinting", "Random", "Revise", "SHA", "UUIDs"]
git-tree-sha1 = "e28ca4df47d0c46d04716422bef6adb660f33dc3"
repo-rev = "main"
repo-url = "https://git.yiem.cc/ton/GeneralUtils"
uuid = "c6c72f09-b708-4ac8-ac7c-2084d70108fe"
version = "0.3.1"
[[deps.HTTP]]
deps = ["Base64", "CodecZlib", "ConcurrentUtilities", "Dates", "ExceptionUnwrapping", "Logging", "LoggingExtras", "MbedTLS", "NetworkOptions", "OpenSSL", "PrecompileTools", "Random", "SimpleBufferStream", "Sockets", "URIs", "UUIDs"]
git-tree-sha1 = "5e6fe50ae7f23d171f44e311c2960294aaa0beb5"
uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3"
version = "1.10.19"
[[deps.HashArrayMappedTries]]
git-tree-sha1 = "2eaa69a7cab70a52b9687c8bf950a5a93ec895ae"
uuid = "076d061b-32b6-4027-95e0-9a2c6f6d7e74"
version = "0.2.0"
[[deps.HypergeometricFunctions]]
deps = ["LinearAlgebra", "OpenLibm_jll", "SpecialFunctions"]
git-tree-sha1 = "68c173f4f449de5b438ee67ed0c9c748dc31a2ec"
uuid = "34004b35-14d8-5ef3-9330-4cdb6864b03a"
version = "0.3.28"
[[deps.InlineStrings]]
git-tree-sha1 = "8f3d257792a522b4601c24a577954b0a8cd7334d"
uuid = "842dd82b-1e85-43dc-bf29-5d0ee9dffc48"
version = "1.4.5"
weakdeps = ["ArrowTypes", "Parsers"]
[deps.InlineStrings.extensions]
ArrowTypesExt = "ArrowTypes"
ParsersExt = "Parsers"
[[deps.InteractiveUtils]]
deps = ["Markdown"]
uuid = "b77e0a4c-d291-57a0-90e8-8db25a27a240"
version = "1.11.0"
[[deps.InvertedIndices]]
git-tree-sha1 = "6da3c4316095de0f5ee2ebd875df8721e7e0bdbe"
uuid = "41ab1584-1d38-5bbf-9106-f11c6c58b48f"
version = "1.3.1"
[[deps.IrrationalConstants]]
git-tree-sha1 = "b2d91fe939cae05960e760110b328288867b5758"
uuid = "92d709cd-6900-40b7-9082-c6be49f344b6"
version = "0.2.6"
[[deps.IteratorInterfaceExtensions]]
git-tree-sha1 = "a3f24677c21f5bbe9d2a714f95dcd58337fb2856"
uuid = "82899510-4779-5014-852e-03e436cf321d"
version = "1.0.0"
[[deps.JLLWrappers]]
deps = ["Artifacts", "Preferences"]
git-tree-sha1 = "0533e564aae234aff59ab625543145446d8b6ec2"
uuid = "692b3bcd-3c85-4b1f-b108-f13ce0eb3210"
version = "1.7.1"
[[deps.JSON]]
deps = ["Dates", "Logging", "Parsers", "PrecompileTools", "StructUtils", "UUIDs", "Unicode"]
git-tree-sha1 = "b3ad4a0255688dcb895a52fafbaae3023b588a90"
uuid = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
version = "1.4.0"
weakdeps = ["ArrowTypes"]
[deps.JSON.extensions]
JSONArrowExt = ["ArrowTypes"]
[[deps.JSON3]]
deps = ["Dates", "Mmap", "Parsers", "PrecompileTools", "StructTypes", "UUIDs"]
git-tree-sha1 = "411eccfe8aba0814ffa0fdf4860913ed09c34975"
uuid = "0f8b85d8-7281-11e9-16c2-39a750bddbf1"
version = "1.14.3"
weakdeps = ["ArrowTypes"]
[deps.JSON3.extensions]
JSON3ArrowExt = ["ArrowTypes"]
[[deps.JuliaInterpreter]]
deps = ["CodeTracking", "InteractiveUtils", "Random", "UUIDs"]
git-tree-sha1 = "80580012d4ed5a3e8b18c7cd86cebe4b816d17a6"
uuid = "aa1ae85d-cabe-5617-a682-6adf51b2e16a"
version = "0.10.9"
[[deps.JuliaSyntaxHighlighting]]
deps = ["StyledStrings"]
uuid = "ac6e5ff7-fb65-4e79-a425-ec3bc9c03011"
version = "1.12.0"
[[deps.LaTeXStrings]]
git-tree-sha1 = "dda21b8cbd6a6c40d9d02a73230f9d70fed6918c"
uuid = "b964fa9f-0449-5b57-a5c2-d3ea65f4040f"
version = "1.4.0"
[[deps.LibCURL]]
deps = ["LibCURL_jll", "MozillaCACerts_jll"]
uuid = "b27032c2-a3e7-50c8-80cd-2d36dbcbfd21"
version = "0.6.4"
[[deps.LibCURL_jll]]
deps = ["Artifacts", "LibSSH2_jll", "Libdl", "OpenSSL_jll", "Zlib_jll", "nghttp2_jll"]
uuid = "deac9b47-8bc7-5906-a0fe-35ac56dc84c0"
version = "8.15.0+0"
[[deps.LibGit2]]
deps = ["LibGit2_jll", "NetworkOptions", "Printf", "SHA"]
uuid = "76f85450-5226-5b5a-8eaa-529ad045b433"
version = "1.11.0"
[[deps.LibGit2_jll]]
deps = ["Artifacts", "LibSSH2_jll", "Libdl", "OpenSSL_jll"]
uuid = "e37daf67-58a4-590a-8e99-b0245dd2ffc5"
version = "1.9.0+0"
[[deps.LibSSH2_jll]]
deps = ["Artifacts", "Libdl", "OpenSSL_jll"]
uuid = "29816b5a-b9ab-546f-933c-edad1886dfa8"
version = "1.11.3+1"
[[deps.Libdl]]
uuid = "8f399da3-3557-5675-b5ff-fb832c97cbdb"
version = "1.11.0"
[[deps.LinearAlgebra]]
deps = ["Libdl", "OpenBLAS_jll", "libblastrampoline_jll"]
uuid = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
version = "1.12.0"
[[deps.LogExpFunctions]]
deps = ["DocStringExtensions", "IrrationalConstants", "LinearAlgebra"]
git-tree-sha1 = "13ca9e2586b89836fd20cccf56e57e2b9ae7f38f"
uuid = "2ab3a3ac-af41-5b50-aa03-7779005ae688"
version = "0.3.29"
[deps.LogExpFunctions.extensions]
LogExpFunctionsChainRulesCoreExt = "ChainRulesCore"
LogExpFunctionsChangesOfVariablesExt = "ChangesOfVariables"
LogExpFunctionsInverseFunctionsExt = "InverseFunctions"
[deps.LogExpFunctions.weakdeps]
ChainRulesCore = "d360d2e6-b24c-11e9-a2a3-2a2ae2dbcce4"
ChangesOfVariables = "9e997f8a-9a97-42d5-a9f1-ce6bfc15e2c0"
InverseFunctions = "3587e190-3f89-42d0-90ee-14403ec27112"
[[deps.Logging]]
uuid = "56ddb016-857b-54e1-b83d-db4d58db5568"
version = "1.11.0"
[[deps.LoggingExtras]]
deps = ["Dates", "Logging"]
git-tree-sha1 = "f00544d95982ea270145636c181ceda21c4e2575"
uuid = "e6f89c97-d47a-5376-807f-9c37f3926c36"
version = "1.2.0"
[[deps.LoweredCodeUtils]]
deps = ["CodeTracking", "Compiler", "JuliaInterpreter"]
git-tree-sha1 = "65ae3db6ab0e5b1b5f217043c558d9d1d33cc88d"
uuid = "6f1432cf-f94c-5a45-995e-cdbf5db27b0b"
version = "3.5.0"
[[deps.Lz4_jll]]
deps = ["Artifacts", "JLLWrappers", "Libdl"]
git-tree-sha1 = "191686b1ac1ea9c89fc52e996ad15d1d241d1e33"
uuid = "5ced341a-0733-55b8-9ab6-a4889d929147"
version = "1.10.1+0"
[[deps.Markdown]]
deps = ["Base64", "JuliaSyntaxHighlighting", "StyledStrings"]
uuid = "d6f4376e-aef5-505a-96c1-9c027394607a"
version = "1.11.0"
[[deps.MbedTLS]]
deps = ["Dates", "MbedTLS_jll", "MozillaCACerts_jll", "NetworkOptions", "Random", "Sockets"]
git-tree-sha1 = "c067a280ddc25f196b5e7df3877c6b226d390aaf"
uuid = "739be429-bea8-5141-9913-cc70e7f3736d"
version = "1.1.9"
[[deps.MbedTLS_jll]]
deps = ["Artifacts", "JLLWrappers", "Libdl"]
git-tree-sha1 = "ff69a2b1330bcb730b9ac1ab7dd680176f5896b8"
uuid = "c8ffd9c3-330d-5841-b78e-0817d7145fa1"
version = "2.28.1010+0"
[[deps.Missings]]
deps = ["DataAPI"]
git-tree-sha1 = "ec4f7fbeab05d7747bdf98eb74d130a2a2ed298d"
uuid = "e1d29d7a-bbdc-5cf2-9ac0-f12de2c33e28"
version = "1.2.0"
[[deps.Mmap]]
uuid = "a63ad114-7e13-5084-954f-fe012c677804"
version = "1.11.0"
[[deps.Mocking]]
deps = ["Compat", "ExprTools"]
git-tree-sha1 = "2c140d60d7cb82badf06d8783800d0bcd1a7daa2"
uuid = "78c3b35d-d492-501b-9361-3d52fe80e533"
version = "0.8.1"
[[deps.MozillaCACerts_jll]]
uuid = "14a3606d-f60d-562e-9121-12d972cd8159"
version = "2025.11.4"
[[deps.NATS]]
deps = ["Base64", "BufferedStreams", "CodecBase", "Dates", "DocStringExtensions", "JSON3", "MbedTLS", "NanoDates", "Random", "ScopedValues", "Sockets", "Sodium", "StructTypes", "URIs"]
git-tree-sha1 = "d9d9a189fb9155a460e6b5e8966bf6a66737abf8"
uuid = "55e73f9c-eeeb-467f-b4cc-a633fde63d2a"
version = "0.1.0"
[[deps.NanoDates]]
deps = ["Dates", "Parsers"]
git-tree-sha1 = "850a0557ae5934f6e67ac0dc5ca13d0328422d1f"
uuid = "46f1a544-deae-4307-8689-c12aa3c955c6"
version = "1.0.3"
[[deps.NetworkOptions]]
uuid = "ca575930-c2e3-43a9-ace4-1e988b2c1908"
version = "1.3.0"
[[deps.OpenBLAS_jll]]
deps = ["Artifacts", "CompilerSupportLibraries_jll", "Libdl"]
uuid = "4536629a-c528-5b80-bd46-f80d51c5b363"
version = "0.3.29+0"
[[deps.OpenLibm_jll]]
deps = ["Artifacts", "Libdl"]
uuid = "05823500-19ac-5b8b-9628-191a04bc5112"
version = "0.8.7+0"
[[deps.OpenSSL]]
deps = ["BitFlags", "Dates", "MozillaCACerts_jll", "NetworkOptions", "OpenSSL_jll", "Sockets"]
git-tree-sha1 = "1d1aaa7d449b58415f97d2839c318b70ffb525a0"
uuid = "4d8831e6-92b7-49fb-bdf8-b643e874388c"
version = "1.6.1"
[[deps.OpenSSL_jll]]
deps = ["Artifacts", "Libdl"]
uuid = "458c3c95-2e84-50aa-8efc-19380b2a3a95"
version = "3.5.4+0"
[[deps.OpenSpecFun_jll]]
deps = ["Artifacts", "CompilerSupportLibraries_jll", "JLLWrappers", "Libdl"]
git-tree-sha1 = "1346c9208249809840c91b26703912dff463d335"
uuid = "efe28fd5-8261-553b-a9e1-b2916fc3738e"
version = "0.5.6+0"
[[deps.OrderedCollections]]
git-tree-sha1 = "05868e21324cede2207c6f0f466b4bfef6d5e7ee"
uuid = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
version = "1.8.1"
[[deps.PDMats]]
deps = ["LinearAlgebra", "SparseArrays", "SuiteSparse"]
git-tree-sha1 = "e4cff168707d441cd6bf3ff7e4832bdf34278e4a"
uuid = "90014a1f-27ba-587c-ab20-58faa44d9150"
version = "0.11.37"
weakdeps = ["StatsBase"]
[deps.PDMats.extensions]
StatsBaseExt = "StatsBase"
[[deps.Parsers]]
deps = ["Dates", "PrecompileTools", "UUIDs"]
git-tree-sha1 = "7d2f8f21da5db6a806faf7b9b292296da42b2810"
uuid = "69de0a69-1ddd-5017-9359-2bf0b02dc9f0"
version = "2.8.3"
[[deps.PooledArrays]]
deps = ["DataAPI", "Future"]
git-tree-sha1 = "36d8b4b899628fb92c2749eb488d884a926614d3"
uuid = "2dfb63ee-cc39-5dd5-95bd-886bf059d720"
version = "1.4.3"
[[deps.PrecompileTools]]
deps = ["Preferences"]
git-tree-sha1 = "07a921781cab75691315adc645096ed5e370cb77"
uuid = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
version = "1.3.3"
[[deps.Preferences]]
deps = ["TOML"]
git-tree-sha1 = "522f093a29b31a93e34eaea17ba055d850edea28"
uuid = "21216c6a-2e73-6563-6e65-726566657250"
version = "1.5.1"
[[deps.PrettyPrinting]]
git-tree-sha1 = "142ee93724a9c5d04d78df7006670a93ed1b244e"
uuid = "54e16d92-306c-5ea0-a30b-337be88ac337"
version = "0.4.2"
[[deps.PrettyTables]]
deps = ["Crayons", "LaTeXStrings", "Markdown", "PrecompileTools", "Printf", "REPL", "Reexport", "StringManipulation", "Tables"]
git-tree-sha1 = "c5a07210bd060d6a8491b0ccdee2fa0235fc00bf"
uuid = "08abe8d2-0d0c-5749-adfa-8a2ac140af0d"
version = "3.1.2"
[[deps.Printf]]
deps = ["Unicode"]
uuid = "de0858da-6303-5e67-8744-51eddeeeb8d7"
version = "1.11.0"
[[deps.PtrArrays]]
git-tree-sha1 = "1d36ef11a9aaf1e8b74dacc6a731dd1de8fd493d"
uuid = "43287f4e-b6f4-7ad1-bb20-aadabca52c3d"
version = "1.3.0"
[[deps.QuadGK]]
deps = ["DataStructures", "LinearAlgebra"]
git-tree-sha1 = "9da16da70037ba9d701192e27befedefb91ec284"
uuid = "1fd47b50-473d-5c70-9696-f719f8f3bcdc"
version = "2.11.2"
[deps.QuadGK.extensions]
QuadGKEnzymeExt = "Enzyme"
[deps.QuadGK.weakdeps]
Enzyme = "7da242da-08ed-463a-9acd-ee780be4f1d9"
[[deps.REPL]]
deps = ["InteractiveUtils", "JuliaSyntaxHighlighting", "Markdown", "Sockets", "StyledStrings", "Unicode"]
uuid = "3fa0cd96-eef1-5676-8a61-b3b8758bbffb"
version = "1.11.0"
[[deps.Random]]
deps = ["SHA"]
uuid = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
version = "1.11.0"
[[deps.Reexport]]
git-tree-sha1 = "45e428421666073eab6f2da5c9d310d99bb12f9b"
uuid = "189a3867-3050-52da-a836-e630ba90ab69"
version = "1.2.2"
[[deps.Revise]]
deps = ["CodeTracking", "FileWatching", "InteractiveUtils", "JuliaInterpreter", "LibGit2", "LoweredCodeUtils", "OrderedCollections", "Preferences", "REPL", "UUIDs"]
git-tree-sha1 = "14d1bfb0a30317edc77e11094607ace3c800f193"
uuid = "295af30f-e4ad-537b-8983-00126c2a3abe"
version = "3.13.2"
[deps.Revise.extensions]
DistributedExt = "Distributed"
[deps.Revise.weakdeps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
[[deps.Rmath]]
deps = ["Random", "Rmath_jll"]
git-tree-sha1 = "5b3d50eb374cea306873b371d3f8d3915a018f0b"
uuid = "79098fc4-a85e-5d69-aa6a-4863f24498fa"
version = "0.9.0"
[[deps.Rmath_jll]]
deps = ["Artifacts", "JLLWrappers", "Libdl"]
git-tree-sha1 = "58cdd8fb2201a6267e1db87ff148dd6c1dbd8ad8"
uuid = "f50d1b31-88e8-58de-be2c-1cc44531875f"
version = "0.5.1+0"
[[deps.SHA]]
uuid = "ea8e919c-243c-51af-8825-aaa63cd721ce"
version = "0.7.0"
[[deps.ScopedValues]]
deps = ["HashArrayMappedTries", "Logging"]
git-tree-sha1 = "c3b2323466378a2ba15bea4b2f73b081e022f473"
uuid = "7e506255-f358-4e82-b7e4-beb19740aa63"
version = "1.5.0"
[[deps.Scratch]]
deps = ["Dates"]
git-tree-sha1 = "9b81b8393e50b7d4e6d0a9f14e192294d3b7c109"
uuid = "6c6a2e73-6563-6170-7368-637461726353"
version = "1.3.0"
[[deps.SentinelArrays]]
deps = ["Dates", "Random"]
git-tree-sha1 = "ebe7e59b37c400f694f52b58c93d26201387da70"
uuid = "91c51154-3ec4-41a3-a24f-3f23e20d615c"
version = "1.4.9"
[[deps.Serialization]]
uuid = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
version = "1.11.0"
[[deps.SimpleBufferStream]]
git-tree-sha1 = "f305871d2f381d21527c770d4788c06c097c9bc1"
uuid = "777ac1f9-54b0-4bf8-805c-2214025038e7"
version = "1.2.0"
[[deps.Sockets]]
uuid = "6462fe0b-24de-5631-8697-dd941f90decc"
version = "1.11.0"
[[deps.Sodium]]
deps = ["Base64", "libsodium_jll"]
git-tree-sha1 = "907703e0d50846f300650d7225bdcab145b7bca9"
uuid = "4f5b5e99-b0ad-42cd-b47a-334e172ec8bd"
version = "1.1.2"
[[deps.SortingAlgorithms]]
deps = ["DataStructures"]
git-tree-sha1 = "64d974c2e6fdf07f8155b5b2ca2ffa9069b608d9"
uuid = "a2af1166-a08f-5f64-846c-94a0d3cef48c"
version = "1.2.2"
[[deps.SparseArrays]]
deps = ["Libdl", "LinearAlgebra", "Random", "Serialization", "SuiteSparse_jll"]
uuid = "2f01184e-e22b-5df5-ae63-d93ebab69eaf"
version = "1.12.0"
[[deps.SpecialFunctions]]
deps = ["IrrationalConstants", "LogExpFunctions", "OpenLibm_jll", "OpenSpecFun_jll"]
git-tree-sha1 = "f2685b435df2613e25fc10ad8c26dddb8640f547"
uuid = "276daf66-3868-5448-9aa4-cd146d93841b"
version = "2.6.1"
[deps.SpecialFunctions.extensions]
SpecialFunctionsChainRulesCoreExt = "ChainRulesCore"
[deps.SpecialFunctions.weakdeps]
ChainRulesCore = "d360d2e6-b24c-11e9-a2a3-2a2ae2dbcce4"
[[deps.Statistics]]
deps = ["LinearAlgebra"]
git-tree-sha1 = "ae3bb1eb3bba077cd276bc5cfc337cc65c3075c0"
uuid = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
version = "1.11.1"
weakdeps = ["SparseArrays"]
[deps.Statistics.extensions]
SparseArraysExt = ["SparseArrays"]
[[deps.StatsAPI]]
deps = ["LinearAlgebra"]
git-tree-sha1 = "178ed29fd5b2a2cfc3bd31c13375ae925623ff36"
uuid = "82ae8749-77ed-4fe6-ae5f-f523153014b0"
version = "1.8.0"
[[deps.StatsBase]]
deps = ["AliasTables", "DataAPI", "DataStructures", "IrrationalConstants", "LinearAlgebra", "LogExpFunctions", "Missings", "Printf", "Random", "SortingAlgorithms", "SparseArrays", "Statistics", "StatsAPI"]
git-tree-sha1 = "aceda6f4e598d331548e04cc6b2124a6148138e3"
uuid = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"
version = "0.34.10"
[[deps.StatsFuns]]
deps = ["HypergeometricFunctions", "IrrationalConstants", "LogExpFunctions", "Reexport", "Rmath", "SpecialFunctions"]
git-tree-sha1 = "91f091a8716a6bb38417a6e6f274602a19aaa685"
uuid = "4c63d2b9-4356-54db-8cca-17b64c39e42c"
version = "1.5.2"
[deps.StatsFuns.extensions]
StatsFunsChainRulesCoreExt = "ChainRulesCore"
StatsFunsInverseFunctionsExt = "InverseFunctions"
[deps.StatsFuns.weakdeps]
ChainRulesCore = "d360d2e6-b24c-11e9-a2a3-2a2ae2dbcce4"
InverseFunctions = "3587e190-3f89-42d0-90ee-14403ec27112"
[[deps.StringManipulation]]
deps = ["PrecompileTools"]
git-tree-sha1 = "a3c1536470bf8c5e02096ad4853606d7c8f62721"
uuid = "892a3eda-7b42-436c-8928-eab12a02cf0e"
version = "0.4.2"
[[deps.StringViews]]
git-tree-sha1 = "f2dcb92855b31ad92fe8f079d4f75ac57c93e4b8"
uuid = "354b36f9-a18e-4713-926e-db85100087ba"
version = "1.3.7"
[[deps.StructTypes]]
deps = ["Dates", "UUIDs"]
git-tree-sha1 = "159331b30e94d7b11379037feeb9b690950cace8"
uuid = "856f2bd8-1eba-4b0a-8007-ebc267875bd4"
version = "1.11.0"
[[deps.StructUtils]]
deps = ["Dates", "UUIDs"]
git-tree-sha1 = "9297459be9e338e546f5c4bedb59b3b5674da7f1"
uuid = "ec057cc2-7a8d-4b58-b3b3-92acb9f63b42"
version = "2.6.2"
[deps.StructUtils.extensions]
StructUtilsMeasurementsExt = ["Measurements"]
StructUtilsTablesExt = ["Tables"]
[deps.StructUtils.weakdeps]
Measurements = "eff96d63-e80a-5855-80a2-b1b0885c5ab7"
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
[[deps.StyledStrings]]
uuid = "f489334b-da3d-4c2e-b8f0-e476e12c162b"
version = "1.11.0"
[[deps.SuiteSparse]]
deps = ["Libdl", "LinearAlgebra", "Serialization", "SparseArrays"]
uuid = "4607b0f0-06f3-5cda-b6b1-a6196a1729e9"
[[deps.SuiteSparse_jll]]
deps = ["Artifacts", "Libdl", "libblastrampoline_jll"]
uuid = "bea87d4a-7f5b-5778-9afe-8cc45184846c"
version = "7.8.3+2"
[[deps.TOML]]
deps = ["Dates"]
uuid = "fa267f1f-6049-4f14-aa54-33bafae1ed76"
version = "1.0.3"
[[deps.TZJData]]
deps = ["Artifacts"]
git-tree-sha1 = "72df96b3a595b7aab1e101eb07d2a435963a97e2"
uuid = "dc5dba14-91b3-4cab-a142-028a31da12f7"
version = "1.5.0+2025b"
[[deps.TableTraits]]
deps = ["IteratorInterfaceExtensions"]
git-tree-sha1 = "c06b2f539df1c6efa794486abfb6ed2022561a39"
uuid = "3783bdb8-4a98-5b6b-af9a-565f29a5fe9c"
version = "1.0.1"
[[deps.Tables]]
deps = ["DataAPI", "DataValueInterfaces", "IteratorInterfaceExtensions", "OrderedCollections", "TableTraits"]
git-tree-sha1 = "f2c1efbc8f3a609aadf318094f8fc5204bdaf344"
uuid = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
version = "1.12.1"
[[deps.Test]]
deps = ["InteractiveUtils", "Logging", "Random", "Serialization"]
uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
version = "1.11.0"
[[deps.TimeZones]]
deps = ["Artifacts", "Dates", "Downloads", "InlineStrings", "Mocking", "Printf", "Scratch", "TZJData", "Unicode", "p7zip_jll"]
git-tree-sha1 = "d422301b2a1e294e3e4214061e44f338cafe18a2"
uuid = "f269a46b-ccf7-5d73-abea-4c690281aa53"
version = "1.22.2"
[deps.TimeZones.extensions]
TimeZonesRecipesBaseExt = "RecipesBase"
[deps.TimeZones.weakdeps]
RecipesBase = "3cdcf5f2-1ef4-517c-9805-6587b60abb01"
[[deps.TranscodingStreams]]
git-tree-sha1 = "0c45878dcfdcfa8480052b6ab162cdd138781742"
uuid = "3bb67fe8-82b1-5028-8e26-92a6c54297fa"
version = "0.11.3"
[[deps.URIs]]
git-tree-sha1 = "bef26fb046d031353ef97a82e3fdb6afe7f21b1a"
uuid = "5c2747f8-b7ea-4ff2-ba2e-563bfd36b1d4"
version = "1.6.1"
[[deps.UUIDs]]
deps = ["Random", "SHA"]
uuid = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
version = "1.11.0"
[[deps.Unicode]]
uuid = "4ec0a83e-493e-50e2-b9ac-8f72acf5a8f5"
version = "1.11.0"
[[deps.WeakRefStrings]]
deps = ["DataAPI", "InlineStrings", "Parsers"]
git-tree-sha1 = "b1be2855ed9ed8eac54e5caff2afcdb442d52c23"
uuid = "ea10d353-3f73-51f8-a26c-33c1cb351aa5"
version = "1.4.2"
[[deps.WorkerUtilities]]
git-tree-sha1 = "cd1659ba0d57b71a464a29e64dbc67cfe83d54e7"
uuid = "76eceee3-57b5-4d4a-8e66-0e911cebbf60"
version = "1.6.1"
[[deps.Zlib_jll]]
deps = ["Libdl"]
uuid = "83775a58-1f1d-513f-b197-d71354ab007a"
version = "1.3.1+2"
[[deps.Zstd_jll]]
deps = ["Artifacts", "JLLWrappers", "Libdl"]
git-tree-sha1 = "446b23e73536f84e8037f5dce465e92275f6a308"
uuid = "3161d3a3-bdf6-5164-811a-617609db77b4"
version = "1.5.7+1"
[[deps.libblastrampoline_jll]]
deps = ["Artifacts", "Libdl"]
uuid = "8e850b90-86db-534c-a0d3-1478176c7d93"
version = "5.15.0+0"
[[deps.libsodium_jll]]
deps = ["Artifacts", "JLLWrappers", "Libdl"]
git-tree-sha1 = "011b0a7331b41c25524b64dc42afc9683ee89026"
uuid = "a9144af2-ca23-56d9-984f-0d03f7b5ccf8"
version = "1.0.21+0"
[[deps.nghttp2_jll]]
deps = ["Artifacts", "Libdl"]
uuid = "8e850ede-7688-5339-a07c-302acd2aaf8d"
version = "1.64.0+1"
[[deps.p7zip_jll]]
deps = ["Artifacts", "CompilerSupportLibraries_jll", "Libdl"]
uuid = "3f19e933-33d8-53b3-aaab-bd5110c3b7a0"
version = "17.7.0+0"

194
Plik_API_document.md Normal file
View File

@@ -0,0 +1,194 @@
### API
Plik server expose a REST-full API to manage uploads and get files :
Get and create upload :
- **POST** /upload
- Params (json object in request body) :
- oneshot (bool)
- stream (bool)
- removable (bool)
- ttl (int)
- login (string)
- password (string)
- files (see below)
- Return :
JSON formatted upload object.
Important fields :
- id (required to upload files)
- uploadToken (required to upload/remove files)
- files (see below)
For stream mode you need to know the file id before the upload starts as it will block.
File size and/or file type also need to be known before the upload starts as they have to be printed
in HTTP response headers.
To get the file ids pass a "files" json object with each file you are about to upload.
Fill the reference field with an arbitrary string to avoid matching file ids using the fileName field.
This is also used to notify of MISSING files when file upload is not yet finished or has failed.
```
"files" : [
{
"fileName": "file.txt",
"fileSize": 12345,
"fileType": "text/plain",
"reference": "0"
},...
]
```
- **GET** /upload/:uploadid:
- Get upload metadata (files list, upload date, ttl,...)
Upload file :
- **POST** /$mode/:uploadid:/:fileid:/:filename:
- Request body must be a multipart request with a part named "file" containing file data.
- **POST** /file/:uploadid:
- Same as above without passing file id, won't work for stream mode.
- **POST** /:
- Quick mode, automatically create an upload with default parameters and add the file to it.
Get file :
- **HEAD** /$mode/:uploadid:/:fileid:/:filename:
- Returns only HTTP headers. Useful to know Content-Type and Content-Length without downloading the file. Especially if upload has OneShot option enabled.
- **GET** /$mode/:uploadid:/:fileid:/:filename:
- Download file. Filename **MUST** match. A browser, might try to display the file if it's a jpeg for example. You may try to force download with ?dl=1 in url.
- **GET** /archive/:uploadid:/:filename:
- Download uploaded files in a zip archive. :filename: must end with .zip
Remove file :
- **DELETE** /$mode/:uploadid:/:fileid:/:filename:
- Delete file. Upload **MUST** have "removable" option enabled.
Show server details :
- **GET** /version
- Show plik server version, and some build information (build host, date, git revision,...)
- **GET** /config
- Show plik server configuration (ttl values, max file size, ...)
- **GET** /stats
- Get server statistics ( upload/file count, user count, total size used )
- Admin only
User authentication :
-
Plik can authenticate users using Google and/or OVH third-party API.
The /auth API is designed for the Plik web application nevertheless if you want to automatize it be sure to provide a valid
Referrer HTTP header and forward all session cookies.
Plik session cookies have the "secure" flag set, so they can only be transmitted over secure HTTPS connections.
To avoid CSRF attacks the value of the plik-xsrf cookie MUST be copied in the X-XSRFToken HTTP header of each
authenticated request.
Once authenticated a user can generate upload tokens. Those tokens can be used in the X-PlikToken HTTP header used to link
an upload to the user account. It can be put in the ~/.plikrc file of the Plik command line client.
- **Local** :
- You'll need to create users using the server command line
- **Google** :
- You'll need to create a new application in the [Google Developper Console](https://console.developers.google.com)
- You'll be handed a Google API ClientID and a Google API ClientSecret that you'll need to put in the plikd.cfg file
- Do not forget to whitelist valid origin and redirect url ( https://yourdomain/auth/google/callback ) for your domain
- **OVH** :
- You'll need to create a new application in the OVH API : https://eu.api.ovh.com/createApp/
- You'll be handed an OVH application key and an OVH application secret key that you'll need to put in the plikd.cfg file
- **GET** /auth/google/login
- Get Google user consent URL. User have to visit this URL to authenticate
- **GET** /auth/google/callback
- Callback of the user consent dialog
- The user will be redirected back to the web application with a Plik session cookie at the end of this call
- **GET** /auth/ovh/login
- Get OVH user consent URL. User have to visit this URL to authenticate
- The response will contain a temporary session cookie to forward the API endpoint and OVH consumer key to the callback
- **GET** /auth/ovh/callback
- Callback of the user consent dialog.
- The user will be redirected back to the web application with a Plik session cookie at the end of this call
- **POST** /auth/local/login
- Params :
- login : user login
- password : user password
- **GET** /auth/logout
- Invalidate Plik session cookies
- **GET** /me
- Return basic user info ( ID, name, email ) and tokens
- **DELETE** /me
- Remove user account.
- **GET** /me/token
- List user tokens
- This call use pagination
- **POST** /me/token
- Create a new upload token
- A comment can be passed in the json body
- **DELETE** /me/token/{token}
- Revoke an upload token
- **GET** /me/uploads
- List user uploads
- Params :
- token : filter by token
- This call use pagination
- **DELETE** /me/uploads
- Remove all uploads linked to a user account
- Params :
- token : filter by token
- **GET** /me/stats
- Get user statistics ( upload/file count, total size used )
- **GET** /users
- List all users
- This call use pagination
- Admin only
QRCode :
- **GET** /qrcode
- Generate a QRCode image from an url
- Params :
- url : The url you want to store in the QRCode
- size : The size of the generated image in pixels (default: 250, max: 1000)
$mode can be "file" or "stream" depending if stream mode is enabled. See FAQ for more details.
Examples :
```sh
Create an upload (in the json response, you'll have upload id and upload token)
$ curl -X POST http://127.0.0.1:8080/upload
Create a OneShot upload
$ curl -X POST -d '{ "OneShot" : true }' http://127.0.0.1:8080/upload
Upload a file to upload
$ curl -X POST --header "X-UploadToken: M9PJftiApG1Kqr81gN3Fq1HJItPENMhl" -F "file=@test.txt" http://127.0.0.1:8080/file/IsrIPIsDskFpN12E
Get headers
$ curl -I http://127.0.0.1:8080/file/IsrIPIsDskFpN12E/sFjIeokH23M35tN4/test.txt
HTTP/1.1 200 OK
Content-Disposition: filename=test.txt
Content-Length: 3486
Content-Type: text/plain; charset=utf-8
Date: Fri, 15 May 2015 09:16:20 GMT
```

8
Project.toml Normal file
View File

@@ -0,0 +1,8 @@
[deps]
Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
GeneralUtils = "c6c72f09-b708-4ac8-ac7c-2084d70108fe"
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3"
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
NATS = "55e73f9c-eeeb-467f-b4cc-a633fde63d2a"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"

294
architecture.md Normal file
View File

@@ -0,0 +1,294 @@
# Architecture Documentation: Bi-Directional Data Bridge (Julia ↔ JavaScript)
## Overview
This document describes the architecture for a high-performance, bi-directional data bridge between a Julia service and a JavaScript (Node.js) service using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
## Architecture Diagram
```mermaid
flowchart TD
subgraph Client
JS[JavaScript Client]
JSApp[Application Logic]
end
subgraph Server
Julia[Julia Service]
NATS[NATS Server]
FileServer[HTTP File Server]
end
JS -->|Control/Small Data| JSApp
JSApp -->|NATS| NATS
NATS -->|NATS| Julia
Julia -->|NATS| NATS
Julia -->|HTTP POST| FileServer
JS -->|HTTP GET| FileServer
style JS fill:#e1f5fe
style Julia fill:#e8f5e9
style NATS fill:#fff3e0
style FileServer fill:#f3e5f5
```
## System Components
### 1. Unified JSON Envelope Schema
All messages use a standardized envelope format:
```json
{
"correlation_id": "uuid-v4-string",
"type": "json|table|binary",
"transport": "direct|link",
"payload": "base64-encoded-string", // Only if transport=direct
"url": "http://fileserver/path/to/data", // Only if transport=link
"metadata": {
"content_type": "application/octet-stream",
"content_length": 123456,
"format": "arrow_ipc_stream"
}
}
```
### 2. Transport Strategy Decision Logic
```
┌─────────────────────────────────────────────────────────────┐
│ SmartSend Function │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Is payload size < 1MB? │
└─────────────────────────────────────────────────────────────┘
┌─────────────────┴─────────────────┐
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Direct Path │ │ Link Path │
│ (< 1MB) │ │ (> 1MB) │
│ │ │ │
│ • Serialize to │ │ • Serialize to │
│ IOBuffer │ │ IOBuffer │
│ • Base64 encode │ │ • Upload to │
│ • Publish to │ │ HTTP Server │
│ NATS │ │ • Publish to │
│ │ │ NATS with URL │
└─────────────────┘ └─────────────────┘
```
### 3. Julia Module Architecture
```mermaid
graph TD
subgraph JuliaModule
SmartSendJulia[SmartSend Julia]
SizeCheck[Size Check]
DirectPath[Direct Path]
LinkPath[Link Path]
HTTPClient[HTTP Client]
end
SmartSendJulia --> SizeCheck
SizeCheck -->|< 1MB| DirectPath
SizeCheck -->|>= 1MB| LinkPath
LinkPath --> HTTPClient
style JuliaModule fill:#c5e1a5
```
### 4. JavaScript Module Architecture
```mermaid
graph TD
subgraph JSModule
SmartSendJS[SmartSend JS]
SmartReceiveJS[SmartReceive JS]
JetStreamConsumer[JetStream Pull Consumer]
ApacheArrow[Apache Arrow]
end
SmartSendJS --> NATS
SmartReceiveJS --> JetStreamConsumer
JetStreamConsumer --> ApacheArrow
style JSModule fill:#f3e5f5
```
## Implementation Details
### Julia Implementation
#### Dependencies
- `NATS.jl` - Core NATS functionality
- `Arrow.jl` - Arrow IPC serialization
- `JSON3.jl` - JSON parsing
- `HTTP.jl` - HTTP client for file server
- `Dates.jl` - Timestamps for logging
#### SmartSend Function
```julia
function SmartSend(
subject::String,
data::Any,
type::String = "json";
nats_url::String = "nats://localhost:4222",
fileserver_url::String = "http://localhost:8080/upload",
size_threshold::Int = 1_000_000 # 1MB
)
```
**Flow:**
1. Serialize data to Arrow IPC stream (if table)
2. Check payload size
3. If < threshold: publish directly to NATS with Base64-encoded payload
4. If >= threshold: upload to HTTP server, publish NATS with URL
#### SmartReceive Handler
```julia
function SmartReceive(msg::NATS.Message)
# Parse envelope
# Check transport type
# If direct: decode Base64 payload
# If link: fetch from URL with exponential backoff
# Deserialize Arrow IPC to DataFrame
end
```
### JavaScript Implementation
#### Dependencies
- `nats.js` - Core NATS functionality
- `apache-arrow` - Arrow IPC serialization
- `uuid` - Correlation ID generation
#### SmartSend Function
```javascript
async function SmartSend(subject, data, type = 'json', options = {})
```
**Flow:**
1. Serialize data to Arrow IPC buffer (if table)
2. Check payload size
3. If < threshold: publish directly to NATS
4. If >= threshold: upload to HTTP server, publish NATS with URL
#### SmartReceive Handler
```javascript
async function SmartReceive(msg, options = {})
```
**Flow:**
1. Parse envelope
2. Check transport type
3. If direct: decode Base64 payload
4. If link: fetch with exponential backoff
5. Deserialize Arrow IPC with zero-copy
## Scenario Implementations
### Scenario 1: Command & Control (Small JSON)
**Julia (Receiver):**
```julia
# Subscribe to control subject
# Parse JSON envelope
# Execute simulation with parameters
# Send acknowledgment
```
**JavaScript (Sender):**
```javascript
// Create small JSON config
// Send via SmartSend with type="json"
```
### Scenario 2: Deep Dive Analysis (Large Arrow Table)
**Julia (Sender):**
```julia
# Create large DataFrame
# Convert to Arrow IPC stream
# Check size (> 1MB)
# Upload to HTTP server
# Publish NATS with URL
```
**JavaScript (Receiver):**
```javascript
// Receive NATS message with URL
// Fetch data from HTTP server
// Parse Arrow IPC with zero-copy
// Load into Perspective.js or D3
```
### Scenario 3: Live Audio Processing
**JavaScript (Sender):**
```javascript
// Capture audio chunk
// Send as binary with metadata headers
// Use SmartSend with type="audio"
```
**Julia (Receiver):**
```julia
// Receive audio data
// Perform FFT or AI transcription
// Send results back (JSON + Arrow table)
```
### Scenario 4: Catch-Up (JetStream)
**Julia (Producer):**
```julia
# Publish to JetStream
# Include metadata for temporal tracking
```
**JavaScript (Consumer):**
```javascript
// Connect to JetStream
// Request replay from last 10 minutes
// Process historical and real-time messages
```
## Performance Considerations
### Zero-Copy Reading
- Use Arrow's memory-mapped file reading
- Avoid unnecessary data copying during deserialization
- Use Apache Arrow's native IPC reader
### Exponential Backoff
- Implement exponential backoff for HTTP link fetching
- Maximum retry count: 5
- Base delay: 100ms, max delay: 5000ms
### Correlation ID Logging
- Log correlation_id at every stage
- Include: send, receive, serialize, deserialize
- Use structured logging format
## Testing Strategy
### Unit Tests
- Test SmartSend with various payload sizes
- Test SmartReceive with direct and link transport
- Test Arrow IPC serialization/deserialization
### Integration Tests
- Test full flow with NATS server
- Test large data transfer (> 100MB)
- Test audio processing pipeline
### Performance Tests
- Measure throughput for small payloads
- Measure throughput for large payloads

321
docs/IMPLEMENTATION.md Normal file
View File

@@ -0,0 +1,321 @@
# Implementation Guide: Bi-Directional Data Bridge
## Overview
This document describes the implementation of the high-performance, bi-directional data bridge between Julia and JavaScript services using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads.
## Architecture
The implementation follows the Claim-Check pattern:
```
┌─────────────────────────────────────────────────────────────────────────┐
│ SmartSend Function │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ Is payload size < 1MB? │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────┴─────────────────┐
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Direct Path │ │ Link Path │
│ (< 1MB) │ │ (> 1MB) │
│ │ │ │
│ • Serialize to │ │ • Serialize to │
│ IOBuffer │ │ IOBuffer │
│ • Base64 encode │ │ • Upload to │
│ • Publish to │ │ HTTP Server │
│ NATS │ │ • Publish to │
│ │ │ NATS with URL │
└─────────────────┘ └─────────────────┘
```
## Files
### Julia Module: [`src/julia_bridge.jl`](../src/julia_bridge.jl)
The Julia implementation provides:
- **[`MessageEnvelope`](../src/julia_bridge.jl)**: Struct for the unified JSON envelope
- **[`SmartSend()`](../src/julia_bridge.jl)**: Handles transport selection based on payload size
- **[`SmartReceive()`](../src/julia_bridge.jl)**: Handles both direct and link transport
### JavaScript Module: [`src/js_bridge.js`](../src/js_bridge.js)
The JavaScript implementation provides:
- **`MessageEnvelope` class**: For the unified JSON envelope
- **[`SmartSend()`](../src/js_bridge.js)**: Handles transport selection based on payload size
- **[`SmartReceive()`](../src/js_bridge.js)**: Handles both direct and link transport
## Installation
### Julia Dependencies
```julia
using Pkg
Pkg.add("NATS")
Pkg.add("Arrow")
Pkg.add("JSON3")
Pkg.add("HTTP")
Pkg.add("UUIDs")
Pkg.add("Dates")
```
### JavaScript Dependencies
```bash
npm install nats.js apache-arrow uuid base64-url
```
## Usage Tutorial
### Step 1: Start NATS Server
```bash
docker run -p 4222:4222 nats:latest
```
### Step 2: Start HTTP File Server (optional)
```bash
# Create a directory for file uploads
mkdir -p /tmp/fileserver
# Use any HTTP server that supports POST for file uploads
# Example: Python's built-in server
python3 -m http.server 8080 --directory /tmp/fileserver
```
### Step 3: Run Test Scenarios
```bash
# Scenario 1: Command & Control (JavaScript sender)
node test/scenario1_command_control.js
# Scenario 2: Large Arrow Table (JavaScript sender)
node test/scenario2_large_table.js
# Scenario 3: Julia-to-Julia communication
# Run both Julia and JavaScript versions
julia test/scenario3_julia_to_julia.jl
node test/scenario3_julia_to_julia.js
```
## Usage
### Scenario 1: Command & Control (Small JSON)
#### JavaScript (Sender)
```javascript
const { SmartSend } = require('./js_bridge');
const config = {
step_size: 0.01,
iterations: 1000
};
await SmartSend("control", config, "json", {
correlationId: "unique-id"
});
```
#### Julia (Receiver)
```julia
using NATS
using JSON3
# Subscribe to control subject
subscribe(nats, "control") do msg
env = MessageEnvelope(String(msg.data))
config = JSON3.read(env.payload)
# Execute simulation with parameters
step_size = config.step_size
iterations = config.iterations
# Send acknowledgment
response = Dict("status" => "Running", "correlation_id" => env.correlation_id)
publish(nats, "control_response", JSON3.stringify(response))
end
```
### Scenario 2: Deep Dive Analysis (Large Arrow Table)
#### Julia (Sender)
```julia
using Arrow
using DataFrames
# Create large DataFrame
df = DataFrame(
id = 1:10_000_000,
value = rand(10_000_000),
category = rand(["A", "B", "C"], 10_000_000)
)
# Send via SmartSend with type="table"
await SmartSend("analysis_results", df, "table");
```
#### JavaScript (Receiver)
```javascript
const { SmartReceive } = require('./js_bridge');
const result = await SmartReceive(msg);
// Use table data for visualization with Perspective.js or D3
const table = result.data;
```
### Scenario 3: Live Binary Processing
#### JavaScript (Sender)
```javascript
const { SmartSend } = require('./js_bridge');
// Capture binary chunk
const binaryData = await navigator.mediaDevices.getUserMedia({ binary: true });
await SmartSend("binary_input", binaryData, "binary", {
metadata: {
sample_rate: 44100,
channels: 1
}
});
```
#### Julia (Receiver)
```julia
using WAV
using DSP
# Receive binary data
function process_binary(data)
# Perform FFT or AI transcription
spectrum = fft(data)
# Send results back (JSON + Arrow table)
results = Dict("transcription" => "sample text", "spectrum" => spectrum)
await SmartSend("binary_output", results, "json")
end
```
### Scenario 4: Catch-Up (JetStream)
#### Julia (Producer)
```julia
using NATS
function publish_health_status(nats)
jetstream = JetStream(nats, "health_updates")
while true
status = Dict("cpu" => rand(), "memory" => rand())
publish(jetstream, "health", status)
sleep(5) # Every 5 seconds
end
end
```
#### JavaScript (Consumer)
```javascript
const { connect } = require('nats');
const nc = await connect({ servers: ['nats://localhost:4222'] });
const js = nc.jetstream();
// Request replay from last 10 minutes
const consumer = await js.pullSubscribe("health", {
durable_name: "catchup",
max_batch: 100,
max_ack_wait: 30000
});
// Process historical and real-time messages
for await (const msg of consumer) {
const result = await SmartReceive(msg);
// Process the data
msg.ack();
}
```
## Configuration
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `NATS_URL` | `nats://localhost:4222` | NATS server URL |
| `FILESERVER_URL` | `http://localhost:8080/upload` | HTTP file server URL |
| `SIZE_THRESHOLD` | `1_000_000` | Size threshold in bytes (1MB) |
### Message Envelope Schema
```json
{
"correlation_id": "uuid-v4-string",
"type": "json|table|binary",
"transport": "direct|link",
"payload": "base64-encoded-string", // Only if transport=direct
"url": "http://fileserver/path/to/data", // Only if transport=link
"metadata": {
"content_type": "application/octet-stream",
"content_length": 123456,
"format": "arrow_ipc_stream"
}
}
```
## Performance Considerations
### Zero-Copy Reading
- Use Arrow's memory-mapped file reading
- Avoid unnecessary data copying during deserialization
- Use Apache Arrow's native IPC reader
### Exponential Backoff
- Maximum retry count: 5
- Base delay: 100ms, max delay: 5000ms
- Implemented in both Julia and JavaScript implementations
### Correlation ID Logging
- Log correlation_id at every stage
- Include: send, receive, serialize, deserialize
- Use structured logging format
## Testing
Run the test scripts:
```bash
# Scenario 1: Command & Control (JavaScript sender)
node test/scenario1_command_control.js
# Scenario 2: Large Arrow Table (JavaScript sender)
node test/scenario2_large_table.js
```
## Troubleshooting
### Common Issues
1. **NATS Connection Failed**
- Ensure NATS server is running
- Check NATS_URL configuration
2. **HTTP Upload Failed**
- Ensure file server is running
- Check FILESERVER_URL configuration
- Verify upload permissions
3. **Arrow IPC Deserialization Error**
- Ensure data is properly serialized to Arrow format
- Check Arrow version compatibility
## License
MIT

42
etc.jl Normal file
View File

@@ -0,0 +1,42 @@
""" fileServerURL = "http://192.168.88.104:8080"
filepath = "/home/ton/docker-apps/sendreceive/image/test.zip"
filename = basename(filepath)
filebytes = read(filepath)
plik_oneshot_upload - Upload a single file to a plik server using one-shot mode
This function uploads a raw byte array to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
The function handles the entire flow:
1. Obtains an upload ID and token from the server
2. Uploads the provided binary data as a file using the `X-UploadToken` header
3. Returns identifiers and download URL for the uploaded file
# Arguments:
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://192.168.88.104:8080"`)
- `filename::String` - Name of the file being uploaded
- `data::Vector{UInt8}` - Raw byte data of the file content
# Return:
- A named tuple with fields:
- `uploadid::String` - ID of the one-shot upload session
- `fileid::String` - ID of the uploaded file within the session
- `downloadurl::String` - Full URL to download the uploaded file
# Example
```jldoctest
using HTTP, JSON
# Example data: "Hello" as bytes
data = collect("Hello World!" |> collect |> CodeUnits |> collect)
# Upload to local plik server
result = plik_oneshot_upload("http://192.168.88.104:8080", "hello.txt", data)
# Download URL for the uploaded file
println(result.downloadurl)
```
"""

BIN
largefile.zip Normal file

Binary file not shown.

669
src/NATSBridge.jl Normal file
View File

@@ -0,0 +1,669 @@
# Bi-Directional Data Bridge - Julia Module
# Implements smartsend and smartreceive for NATS communication
# This module provides functionality for sending and receiving data across network boundaries
# using NATS as the message bus, with support for both direct payload transport and
# URL-based transport for larger payloads.
module NATSBridge
using NATS, JSON, Arrow, HTTP, UUIDs, Dates
# ---------------------------------------------- 100 --------------------------------------------- #
# Constants
const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport
const DEFAULT_NATS_URL = "nats://localhost:4222" # Default NATS server URL
const DEFAULT_FILESERVER_URL = "http://localhost:8080/upload" # Default HTTP file server URL for link transport
""" Struct for the unified JSON envelope
This struct represents a standardized message format that can carry either
direct payload data or a URL reference, allowing flexible transport strategies
based on payload size and requirements.
"""
struct MessageEnvelope
correlation_id::String # Unique identifier to track messages across systems
type::String # Data type indicator (e.g., "json", "table", "binary")
transport::String # Transport strategy: "direct" (base64 encoded bytes) or "link" (URL reference)
payload::Union{String, Nothing} # Base64-encoded payload for direct transport
url::Union{String, Nothing} # URL reference for link transport
metadata::Dict{String, Any} # Additional metadata about the payload
end
""" Constructor for MessageEnvelope with keyword arguments and defaults
This constructor provides a convenient way to create an envelope using keyword arguments,
automatically generating a correlation ID if not provided, and defaulting to "json" type
and "direct" transport.
"""
function MessageEnvelope(
; correlation_id::String = string(uuid4()), # Generate unique ID if not provided
type::String = "json", # Default data type
transport::String = "direct", # Default transport method
payload::Union{String, Nothing} = nothing, # No payload by default
url::Union{String, Nothing} = nothing, # No URL by default
metadata::Dict{String, Any} = Dict{String, Any}() # Empty metadata by default
)
MessageEnvelope(correlation_id, type, transport, payload, url, metadata)
end
""" Constructor for MessageEnvelope from JSON string
This constructor parses a JSON string and reconstructs a MessageEnvelope struct.
It handles the metadata field specially by converting the JSON object to a Julia Dict,
extracting values from the JSON structure for all other fields.
"""
function MessageEnvelope(json_str::String)
data = JSON.parse(json_str) # Parse JSON string into Julia data structure
metadata = Dict{String, Any}()
if haskey(data, :metadata) # Check if metadata exists in JSON
metadata = Dict(String(k) => v for (k, v) in data.metadata) # Convert JSON keys to strings and store in Dict
end
MessageEnvelope(
correlation_id = String(data.correlation_id), # Extract correlation_id from JSON data
type = String(data.type), # Extract type from JSON data
transport = String(data.transport), # Extract transport from JSON data
payload = haskey(data, :payload) ? String(data.payload) : nothing, # Extract payload if present
url = haskey(data, :url) ? String(data.url) : nothing, # Extract URL if present
metadata = metadata # Use the parsed metadata
)
end
""" Convert MessageEnvelope to JSON string
This function converts the MessageEnvelope struct to a JSON string representation.
It only includes fields in the JSON output if they have non-nothing values,
making the JSON output cleaner and more efficient.
"""
function envelope_to_json(env::MessageEnvelope)
obj = Dict{String, Any}(
"correlation_id" => env.correlation_id, # Always include correlation_id
"type" => env.type, # Always include type
"transport" => env.transport # Always include transport
)
if env.payload !== nothing # Only include payload if it exists
obj["payload"] = env.payload
end
if env.url !== nothing # Only include URL if it exists
obj["url"] = env.url
end
if !isempty(env.metadata) # Only include metadata if it exists and is not empty
obj["metadata"] = env.metadata
end
JSON.json(obj) # Convert Dict to JSON string
end
""" Log a trace message with correlation ID and timestamp
This function logs information messages with a correlation ID for tracing purposes,
making it easier to track message flow across distributed systems.
"""
function log_trace(correlation_id::String, message::String)
timestamp = Dates.now() # Get current timestamp
@info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message
end
""" smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size
This function intelligently routes data delivery based on payload size relative to a threshold.
If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS.
Otherwise, it uploads the data to a fileserver (by default using `plik_oneshot_upload`) and publishes only the download URL over NATS.
The function workflow:
1. Serializes the provided data according to the specified format (`type`)
2. Compares the serialized size against `size_threshold`
3. For small payloads: encodes as Base64, constructs a "direct" MessageEnvelope, and publishes to NATS
4. For large payloads: uploads to the fileserver, constructs a "link" MessageEnvelope with the URL, and publishes to NATS
# Arguments:
- `subject::String` - NATS subject to publish the message to
- `data::Any` - Data payload to send (any Julia object)
- `type::String = "json"` - Serialization format: `"json"` or `"arrow"`
# Keyword Arguments:
- `dataname::String = string(UUIDs.uuid4())` - Filename to use when uploading to fileserver (auto-generated UUID if not provided)
- `nats_url::String = DEFAULT_NATS_URL` - URL of the NATS server
- `fileserver_url::String = DEFAULT_FILESERVER_URL` - Base URL of the fileserver (e.g., `"http://localhost:8080"`)
- `fileServerUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must match signature of `plik_oneshot_upload`)
- `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport
- `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated
# Return:
- A `MessageEnvelope` object containing metadata and transport information:
- `correlation_id::String` - Unique identifier for this message exchange
- `type::String` - Serialization type used (`"json"` or `"arrow"`)
- `transport::String` - Either `"direct"` or `"link"`
- `payload::Union{String, Nothing}` - Base64-encoded data for direct transport, `nothing` for link transport
- `url::Union{String, Nothing}` - Download URL for link transport, `nothing` for direct transport
- `metadata::Dict` - Additional metadata (e.g., `"content_length"`, `"format"`)
# Example
```julia
using UUIDs
# Send a small struct directly via NATS
data = Dict("key" => "value")
env = smartsend("my.subject", data, "json")
# Send a large array using fileserver upload
data = rand(10_000_000) # ~80 MB
env = smartsend("large.data", data, "arrow")
# In another process, retrieve and deserialize:
# msg = subscribe(nats_url, "my.subject")
# env = json_to_envelope(msg.data)
# data = _deserialize_data(base64decode(env.payload), env.type)
```
"""
function smartsend(
subject::String, # smartreceive's subject
data::Any,
type::String = "json";
dataname=string(UUIDs.uuid4()),
nats_url::String = DEFAULT_NATS_URL,
fileserver_url::String = DEFAULT_FILESERVER_URL,
fileServerUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver
size_threshold::Int = DEFAULT_SIZE_THRESHOLD,
correlation_id::Union{String, Nothing} = nothing
)
# Generate correlation ID if not provided
cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID
log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation
# Serialize data based on type
payload_bytes = _serialize_data(data, type) # Convert data to bytes based on type
payload_size = length(payload_bytes) # Calculate payload size in bytes
log_trace(cid, "Serialized payload size: $payload_size bytes") # Log payload size
# Decision: Direct vs Link
if payload_size < size_threshold # Check if payload is small enough for direct transport
# Direct path - Base64 encode and send via NATS
payload_b64 = base64encode(payload_bytes) # Encode bytes as base64 string
log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice
env = MessageEnvelope( # Create envelope for direct transport
correlation_id = cid,
type = type,
transport = "direct",
payload = payload_b64,
metadata = Dict("content_length" => payload_size, "format" => "arrow_ipc_stream")
)
msg_json = envelope_to_json(env) # Convert envelope to JSON
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS
return env # Return the envelope for tracking
else
# Link path - Upload to HTTP server, send URL via NATS
log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice
# Upload to HTTP server
response = fileServerUploadHandler(fileserver_url, dataname, payload_bytes)
if response[:status] != 200 # Check if upload was successful
error("Failed to upload data to fileserver: $(response[:status])") # Throw error if upload failed
end
url = response[:url] # URL for the uploaded data
log_trace(cid, "Uploaded to URL: $url") # Log successful upload
env = MessageEnvelope( # Create envelope for link transport
correlation_id = cid,
type = type,
transport = "link",
url = url,
metadata = Dict("content_length" => payload_size, "format" => "arrow_ipc_stream")
)
msg_json = envelope_to_json(env) # Convert envelope to JSON
publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS
return env # Return the envelope for tracking
end
end
""" _serialize_data - Serialize data according to specified format
This function serializes arbitrary Julia data into a binary representation based on the specified format.
It supports three serialization formats:
- `"json"`: Serializes data as JSON and returns the UTF-8 byte representation
- `"table"`: Serializes data as an Arrow IPC stream (table format) and returns the byte stream
- `"binary"`: Expects already-binary data (either `IOBuffer` or `Vector{UInt8}`) and returns it as bytes
The function handles format-specific serialization logic:
1. For `"json"`: Converts Julia data to JSON string, then encodes to bytes
2. For `"table"`: Uses Arrow.jl to write data as an Arrow IPC stream to an in-memory buffer
3. For `"binary"`: Extracts bytes from `IOBuffer` or returns `Vector{UInt8}` directly
# Arguments:
- `data::Any` - Data to serialize (JSON-serializable for `"json"`, table-like for `"table"`, binary for `"binary"`)
- `type::String` - Target format: `"json"`, `"table"`, or `"binary"`
# Return:
- `Vector{UInt8}` - Binary representation of the serialized data
# Throws:
- `Error` if `type` is not one of `"json"`, `"table"`, or `"binary"`
- `Error` if `type == "binary"` but `data` is neither `IOBuffer` nor `Vector{UInt8}`
# Example
```julia
using JSON, Arrow, DataFrames
# JSON serialization
json_data = Dict("name" => "Alice", "age" => 30)
json_bytes = _serialize_data(json_data, "json")
# Table serialization with a DataFrame (recommended for tabular data)
df = DataFrame(id = 1:3, name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92])
table_bytes = _serialize_data(df, "table")
# Table serialization with named tuple of vectors (also supported)
nt = (id = [1, 2, 3], name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92])
table_bytes_nt = _serialize_data(nt, "table")
# Binary data (IOBuffer)
buf = IOBuffer()
write(buf, "hello")
binary_bytes = _serialize_data(buf, "binary")
# Binary data (already bytes)
binary_bytes_direct = _serialize_data(UInt8[1, 2, 3], "binary")
```
"""
function _serialize_data(data::Any, type::String)
if type == "json" # JSON data - serialize directly
json_str = JSON.json(data) # Convert Julia data to JSON string
return bytes(json_str) # Convert JSON string to bytes
elseif type == "table" # Table data - convert to Arrow IPC stream
io = IOBuffer() # Create in-memory buffer
Arrow.write(io, data) # Write data as Arrow IPC stream to buffer
return take!(io) # Return the buffer contents as bytes
elseif type == "binary" # Binary data - treat as binary
if isa(data, IOBuffer) # Check if data is an IOBuffer
return take!(data) # Return buffer contents as bytes
elseif isa(data, Vector{UInt8}) # Check if data is already binary
return data # Return binary data directly
else # Unsupported binary data type
error("Binary data must be binary (Vector{UInt8} or IOBuffer)")
end
else # Unknown type
error("Unknown type: $type")
end
end
""" Publish message to NATS
This internal function publishes a message to a NATS subject with proper
connection management and logging.
Arguments:
- `nats_url::String` - NATS server URL
- `subject::String` - NATS subject to publish to
- `message::String` - JSON message to publish
- `correlation_id::String` - Correlation ID for logging
"""
function publish_message(nats_url::String, subject::String, message::String, correlation_id::String)
conn = NATS.connect(nats_url) # Create NATS connection
try
NATS.publish(conn, subject, message) # Publish message to NATS
log_trace(correlation_id, "Message published to $subject") # Log successful publish
finally
NATS.drain(conn) # Ensure connection is closed properly
end
end
""" smartreceive - Receive and process messages from NATS
This function processes incoming NATS messages, handling both direct transport
(base64 decoded payloads) and link transport (URL-based payloads).
It deserializes the data based on the transport type and returns the result.
A HTTP file server is required along with its upload function.
Arguments:
- `msg::NATS.Message` - NATS message to process
Keyword Arguments:
- `fileserver_url::String` - HTTP file server URL for link transport (default: DEFAULT_FILESERVER_URL)
- `max_retries::Int` - Maximum retry attempts for fetching URL (default: 5)
- `base_delay::Int` - Initial delay for exponential backoff in ms (default: 100)
- `max_delay::Int` - Maximum delay for exponential backoff in ms (default: 5000)
Return:
- Tuple `(data = deserialized_data, envelope = MessageEnvelope)` - Data and envelope
"""
function smartreceive(
msg::NATS.Msg;
fileserver_url::String = DEFAULT_FILESERVER_URL,
max_retries::Int = 5,
base_delay::Int = 100,
max_delay::Int = 5000
)
# Parse the envelope
env = MessageEnvelope(String(msg.data)) # Parse NATS message data as JSON envelope
log_trace(env.correlation_id, "Processing received message") # Log message processing start
# Check transport type
if env.transport == "direct" # Direct transport - payload is in the message
log_trace(env.correlation_id, "Direct transport - decoding payload") # Log direct transport handling
# Decode Base64 payload
payload_bytes = base64decode(env.payload) # Decode base64 payload to bytes
# Deserialize based on type
data = _deserialize_data(payload_bytes, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data
return (data = data, envelope = env) # Return data and envelope as tuple
elseif env.transport == "link" # Link transport - payload is at URL
log_trace(env.correlation_id, "Link transport - fetching from URL") # Log link transport handling
# Fetch with exponential backoff
data = _fetch_with_backoff(env.url, max_retries, base_delay, max_delay, env.correlation_id) # Fetch data from URL
# Deserialize based on type
result = _deserialize_data(data, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data
return (data = result, envelope = env) # Return data and envelope as tuple
else # Unknown transport type
error("Unknown transport type: $(env.transport)") # Throw error for unknown transport
end
end
""" Fetch data from URL with exponential backoff
This internal function retrieves data from a URL with retry logic using
exponential backoff to handle transient failures.
Arguments:
- `url::String` - URL to fetch from
- `max_retries::Int` - Maximum number of retry attempts
- `base_delay::Int` - Initial delay in milliseconds
- `max_delay::Int` - Maximum delay in milliseconds
- `correlation_id::String` - Correlation ID for logging
Return:
- Vector{UInt8} - Fetched data as bytes
"""
function _fetch_with_backoff(
url::String,
max_retries::Int,
base_delay::Int,
max_delay::Int,
correlation_id::String
)
delay = base_delay # Initialize delay with base delay value
for attempt in 1:max_retries # Attempt to fetch data up to max_retries times
try
response = HTTP.request("GET", url) # Make HTTP GET request to URL
if response.status == 200 # Check if request was successful
log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") # Log success
return response.body # Return response body as bytes
else # Request failed
error("Failed to fetch: $(response.status)") # Throw error for non-200 status
end
catch e # Handle exceptions during fetch
log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") # Log failure
if attempt < max_retries # Only sleep if not the last attempt
sleep(delay / 1000.0) # Sleep for delay seconds (convert from ms)
delay = min(delay * 2, max_delay) # Double delay for next attempt, capped at max_delay
end
end
end
error("Failed to fetch data after $max_retries attempts") # Throw error if all attempts failed
end
""" Deserialize bytes to data based on type
This internal function converts serialized bytes back to Julia data based on type.
It handles "json" (JSON deserialization), "table" (Arrow IPC deserialization),
and "binary" (binary data).
Arguments:
- `data::Vector{UInt8}` - Serialized data as bytes
- `type::String` - Data type ("json", "table", "binary")
- `correlation_id::String` - Correlation ID for logging
- `metadata::Dict{String, Any}` - Metadata about the data
Return:
- Deserialized data (DataFrame for "table", JSON data for "json", bytes for "binary")
"""
function _deserialize_data(
data::Vector{UInt8},
type::String,
correlation_id::String,
metadata::Dict{String, Any}
)
if type == "json" # JSON data - deserialize
json_str = String(data) # Convert bytes to string
return JSON.parse(json_str) # Parse JSON string to Julia data structure
elseif type == "table" # Table data - deserialize Arrow IPC stream
io = IOBuffer(data) # Create buffer from bytes
df = Arrow.Table(io) # Read Arrow IPC format from buffer
return df # Return DataFrame
elseif type == "binary" # Binary data - return binary
return data # Return bytes directly
else # Unknown type
error("Unknown type: $type") # Throw error for unknown type
end
end
""" Decode base64 string to bytes
This internal function decodes a base64-encoded string back to binary data.
It's a wrapper around Base64.decode for consistency in the module.
Arguments:
- `str::String` - Base64-encoded string to decode
Return:
- Vector{UInt8} - Decoded binary data
"""
function base64decode(str::String)
return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module
end
""" plik_oneshot_upload - Upload a single file to a plik server using one-shot mode
This function uploads a raw byte array to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
The function workflow:
1. Obtains an upload ID and token from the server
2. Uploads the provided binary data as a file using the `X-UploadToken` header
3. Returns identifiers and download URL for the uploaded file
# Arguments:
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
- `filename::String` - Name of the file being uploaded
- `data::Vector{UInt8}` - Raw byte data of the file content
# Return:
- A named tuple with fields:
- `status::Integer` - HTTP server response status
- `uploadid::String` - ID of the one-shot upload session
- `fileid::String` - ID of the uploaded file within the session
- `url::String` - Full URL to download the uploaded file
# Example
```jldoctest
using HTTP, JSON
fileServerURL = "http://localhost:8080"
filepath = "./test.zip"
filename = basename(filepath)
filebytes = read(filepath) # read(filepath) output is raw bytes of the file
# Upload to local plik server
status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filename, filebytes)
# to download an uploaded file
curl -L -O "url"
```
""" #[x]
function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vector{UInt8})
# ----------------------------------------- get upload id ---------------------------------------- #
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
url_getUploadID = "$fileServerURL/upload" # URL to get upload ID
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(httpResponse.body)
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
# ------------------------------------------ upload file ----------------------------------------- #
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
file_multipart = HTTP.Multipart(filename, IOBuffer(data), "application/octet-stream") # Plik won't accept raw bytes upload
url_upload = "$fileServerURL/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
# Create the multipart form data
form = HTTP.Form(Dict(
"file" => file_multipart
))
# Execute the POST request
httpResponse = nothing
try
httpResponse = HTTP.post(url_upload, headers, form)
# println("Status: ", httpResponse.status)
responseJson = JSON.parse(httpResponse.body)
catch e
@error "Request failed" exception=e
end
fileid=responseJson["id"]
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
url = "$fileServerURL/file/$uploadid/$fileid/$filename"
return (status=httpResponse.status, uploadid=uploadid, fileid=fileid, url=url)
end
""" plik_oneshot_upload(fileServerURL::String, filepath::String)
Upload a single file to a plik server using one-shot mode.
This function uploads a file from disk to a plik server in one-shot mode (no upload session).
It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`,
retrieves an upload ID and token, then uploads the file data as multipart form data using the token.
The function workflow:
1. Obtains an upload ID and token from the server
2. Uploads the file at `filepath` using multipart form data and the `X-UploadToken` header
3. Returns identifiers and download URL for the uploaded file
# Arguments:
- `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`)
- `filepath::String` - Full path to the local file to upload
# Return:
- A named tuple with fields:
- `status::Integer` - HTTP server response status
- `uploadid::String` - ID of the one-shot upload session
- `fileid::String` - ID of the uploaded file within the session
- `url::String` - Full URL to download the uploaded file
# Example
```julia
using HTTP, JSON
fileServerURL = "http://localhost:8080"
filepath = "./test.zip"
# Upload to local plik server
status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filepath)
# To download the uploaded file later (via curl as example):
curl -L -O "url"
```
""" #[x]
function plik_oneshot_upload(fileServerURL::String, filepath::String)
# ----------------------------------------- get upload id ---------------------------------------- #
# Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload
filename = basename(filepath)
url_getUploadID = "$fileServerURL/upload" # URL to get upload ID
headers = ["Content-Type" => "application/json"]
body = """{ "OneShot" : true }"""
httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false)
responseJson = JSON.parse(httpResponse.body)
uploadid = responseJson["id"]
uploadtoken = responseJson["uploadToken"]
println("uploadid = ", uploadid)
# ------------------------------------------ upload file ----------------------------------------- #
# Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID
file_multipart = open(filepath, "r")
url_upload = "$fileServerURL/file/$uploadid"
headers = ["X-UploadToken" => uploadtoken]
# Create the multipart form data
form = HTTP.Form(Dict(
"file" => file_multipart
))
# Execute the POST request
httpResponse = nothing
try
httpResponse = HTTP.post(url_upload, headers, form)
# println("Status: ", httpResponse.status)
responseJson = JSON.parse(httpResponse.body)
catch e
@error "Request failed" exception=e
end
fileid=responseJson["id"]
# url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip"
url = "$fileServerURL/file/$uploadid/$fileid/$filename"
return (status=httpResponse.status, uploadid=uploadid, fileid=fileid, url=url)
end
end # module

245
src/NATSBridge.js Normal file
View File

@@ -0,0 +1,245 @@
/**
* Bi-Directional Data Bridge - JavaScript Module
* Implements SmartSend and SmartReceive for NATS communication
*/
const { v4: uuidv4 } = require('uuid');
const { decode, encode } = require('base64-url');
const Arrow = require('apache-arrow');
// Constants
const DEFAULT_SIZE_THRESHOLD = 1_000_000; // 1MB
const DEFAULT_NATS_URL = 'nats://localhost:4222';
const DEFAULT_FILESERVER_URL = 'http://localhost:8080/upload';
// Logging helper
function logTrace(correlationId, message) {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] [Correlation: ${correlationId}] ${message}`);
}
// Message Envelope Class
class MessageEnvelope {
constructor(options = {}) {
this.correlation_id = options.correlation_id || uuidv4();
this.type = options.type || 'json';
this.transport = options.transport || 'direct';
this.payload = options.payload || null;
this.url = options.url || null;
this.metadata = options.metadata || {};
}
static fromJSON(jsonStr) {
const data = JSON.parse(jsonStr);
return new MessageEnvelope({
correlation_id: data.correlation_id,
type: data.type,
transport: data.transport,
payload: data.payload || null,
url: data.url || null,
metadata: data.metadata || {}
});
}
toJSON() {
const obj = {
correlation_id: this.correlation_id,
type: this.type,
transport: this.transport
};
if (this.payload) {
obj.payload = this.payload;
}
if (this.url) {
obj.url = this.url;
}
if (Object.keys(this.metadata).length > 0) {
obj.metadata = this.metadata;
}
return JSON.stringify(obj);
}
}
// SmartSend for JavaScript - Handles transport selection based on payload size
async function SmartSend(subject, data, type = 'json', options = {}) {
const {
natsUrl = DEFAULT_NATS_URL,
fileserverUrl = DEFAULT_FILESERVER_URL,
sizeThreshold = DEFAULT_SIZE_THRESHOLD,
correlationId = uuidv4()
} = options;
logTrace(correlationId, `Starting SmartSend for subject: ${subject}`);
// Serialize data based on type
const payloadBytes = _serializeData(data, type, correlationId);
const payloadSize = payloadBytes.length;
logTrace(correlationId, `Serialized payload size: ${payloadSize} bytes`);
// Decision: Direct vs Link
if (payloadSize < sizeThreshold) {
// Direct path - Base64 encode and send via NATS
const payloadBase64 = encode(payloadBytes);
logTrace(correlationId, `Using direct transport for ${payloadSize} bytes`);
const env = new MessageEnvelope({
correlation_id: correlationId,
type: type,
transport: 'direct',
payload: payloadBase64,
metadata: {
content_length: payloadSize.toString(),
format: 'arrow_ipc_stream'
}
});
await publishMessage(natsUrl, subject, env.toJSON(), correlationId);
return env;
} else {
// Link path - Upload to HTTP server, send URL via NATS
logTrace(correlationId, `Using link transport, uploading to fileserver`);
const url = await uploadToServer(payloadBytes, fileserverUrl, correlationId);
const env = new MessageEnvelope({
correlation_id: correlationId,
type: type,
transport: 'link',
url: url,
metadata: {
content_length: payloadSize.toString(),
format: 'arrow_ipc_stream'
}
});
await publishMessage(natsUrl, subject, env.toJSON(), correlationId);
return env;
}
}
// Helper: Serialize data based on type
function _serializeData(data, type, correlationId) {
if (type === 'json') {
const jsonStr = JSON.stringify(data);
return Buffer.from(jsonStr, 'utf8');
} else if (type === 'table') {
// Table data - convert to Arrow IPC stream
const writer = new Arrow.Writer();
writer.writeTable(data);
return writer.toByteArray();
} else if (type === 'binary') {
// Binary data - treat as binary
if (data instanceof Buffer) {
return data;
} else if (Array.isArray(data)) {
return Buffer.from(data);
} else {
throw new Error('Binary data must be binary (Buffer or Array)');
}
} else {
throw new Error(`Unknown type: ${type}`);
}
}
// Helper: Publish message to NATS
async function publishMessage(natsUrl, subject, message, correlationId) {
const { connect } = require('nats');
try {
const nc = await connect({ servers: [natsUrl] });
await nc.publish(subject, message);
logTrace(correlationId, `Message published to ${subject}`);
nc.close();
} catch (error) {
logTrace(correlationId, `Failed to publish message: ${error.message}`);
throw error;
}
}
// SmartReceive for JavaScript - Handles both direct and link transport
async function SmartReceive(msg, options = {}) {
const {
fileserverUrl = DEFAULT_FILESERVER_URL,
maxRetries = 5,
baseDelay = 100,
maxDelay = 5000
} = options;
const env = MessageEnvelope.fromJSON(msg.data);
logTrace(env.correlation_id, `Processing received message`);
if (env.transport === 'direct') {
logTrace(env.correlation_id, `Direct transport - decoding payload`);
const payloadBytes = decode(env.payload);
const data = _deserializeData(payloadBytes, env.type, env.correlation_id, env.metadata);
return { data, envelope: env };
} else if (env.transport === 'link') {
logTrace(env.correlation_id, `Link transport - fetching from URL`);
const data = await _fetchWithBackoff(env.url, maxRetries, baseDelay, maxDelay, env.correlation_id);
const result = _deserializeData(data, env.type, env.correlation_id, env.metadata);
return { data: result, envelope: env };
} else {
throw new Error(`Unknown transport type: ${env.transport}`);
}
}
// Helper: Fetch with exponential backoff
async function _fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlationId) {
let delay = baseDelay;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await fetch(url);
if (response.ok) {
const buffer = await response.arrayBuffer();
logTrace(correlationId, `Successfully fetched data from ${url} on attempt ${attempt}`);
return new Uint8Array(buffer);
} else {
throw new Error(`Failed to fetch: ${response.status}`);
}
} catch (error) {
logTrace(correlationId, `Attempt ${attempt} failed: ${error.message}`);
if (attempt < maxRetries) {
await new Promise(resolve => setTimeout(resolve, delay));
delay = Math.min(delay * 2, maxDelay);
}
}
}
throw new Error(`Failed to fetch data after ${maxRetries} attempts`);
}
// Helper: Deserialize data based on type
async function _deserializeData(data, type, correlationId, metadata) {
if (type === 'json') {
const jsonStr = new TextDecoder().decode(data);
return JSON.parse(jsonStr);
} else if (type === 'table') {
// Deserialize Arrow IPC stream to Table
const table = Arrow.Table.from(data);
return table;
} else if (type === 'binary') {
// Return binary binary data
return data;
} else {
throw new Error(`Unknown type: ${type}`);
}
}
// Export functions
module.exports = {
SmartSend,
SmartReceive,
MessageEnvelope
};

View File

@@ -0,0 +1,67 @@
#!/usr/bin/env julia
# Scenario 1: Command & Control (Small JSON)
# Tests small JSON payloads (< 1MB) sent directly via NATS
using NATS
using JSON3
using UUIDs
# Include the bridge module
include("../src/julia_bridge.jl")
using .BiDirectionalBridge
# Configuration
const CONTROL_SUBJECT = "control"
const RESPONSE_SUBJECT = "control_response"
const NATS_URL = "nats://localhost:4222"
# Create correlation ID for tracing
correlation_id = string(uuid4())
# Receiver: Listen for control commands
function start_control_listener()
conn = NATS.Connection(NATS_URL)
try
NATS.subscribe(conn, CONTROL_SUBJECT) do msg
log_trace(msg.data)
# Parse the envelope
env = MessageEnvelope(String(msg.data))
# Parse JSON payload
config = JSON3.read(env.payload)
# Execute simulation with parameters
step_size = config.step_size
iterations = config.iterations
# Simulate processing
sleep(0.1) # Simulate some work
# Send acknowledgment
response = Dict(
"status" => "Running",
"correlation_id" => env.correlation_id,
"step_size" => step_size,
"iterations" => iterations
)
NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response))
log_trace("Sent response: $(JSON3.stringify(response))")
end
# Keep listening for 5 seconds
sleep(5)
finally
NATS.close(conn)
end
end
# Helper: Log with correlation ID
function log_trace(message)
timestamp = Dates.now()
println("[$timestamp] [Correlation: $correlation_id] $message")
end
# Run the listener
start_control_listener()

View File

@@ -0,0 +1,34 @@
#!/usr/bin/env node
// Scenario 1: Command & Control (Small JSON)
// Tests small JSON payloads (< 1MB) sent directly via NATS
const { SmartSend } = require('../js_bridge');
// Configuration
const CONTROL_SUBJECT = "control";
const NATS_URL = "nats://localhost:4222";
// Create correlation ID for tracing
const correlationId = require('uuid').v4();
// Sender: Send control command to Julia
async function sendControlCommand() {
const config = {
step_size: 0.01,
iterations: 1000
};
// Send via SmartSend with type="json"
const env = await SmartSend(
CONTROL_SUBJECT,
config,
"json",
{ correlationId }
);
console.log(`Sent control command with correlation_id: ${correlationId}`);
console.log(`Envelope: ${JSON.stringify(env, null, 2)}`);
}
// Run the sender
sendControlCommand().catch(console.error);

View File

@@ -0,0 +1,66 @@
#!/usr/bin/env julia
# Scenario 2: Deep Dive Analysis (Large Arrow Table)
# Tests large Arrow tables (> 1MB) sent via HTTP fileserver
using NATS
using Arrow
using DataFrames
using JSON3
using UUIDs
# Include the bridge module
include("../src/julia_bridge.jl")
using .BiDirectionalBridge
# Configuration
const ANALYSIS_SUBJECT = "analysis_results"
const RESPONSE_SUBJECT = "analysis_response"
const NATS_URL = "nats://localhost:4222"
# Create correlation ID for tracing
correlation_id = string(uuid4())
# Receiver: Listen for analysis results
function start_analysis_listener()
conn = NATS.Connection(NATS_URL)
try
NATS.subscribe(conn, ANALYSIS_SUBJECT) do msg
log_trace("Received message from $(msg.subject)")
# Parse the envelope
env = MessageEnvelope(String(msg.data))
# Use SmartReceive to handle the data
result = SmartReceive(msg)
# Process the data based on type
if result.envelope.type == "table"
df = result.data
log_trace("Received DataFrame with $(nrows(df)) rows")
log_trace("DataFrame columns: $(names(df))")
# Send acknowledgment
response = Dict(
"status" => "Processed",
"correlation_id" => env.correlation_id,
"row_count" => nrows(df)
)
NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response))
end
end
# Keep listening for 10 seconds
sleep(10)
finally
NATS.close(conn)
end
end
# Helper: Log with correlation ID
function log_trace(message)
timestamp = Dates.now()
println("[$timestamp] [Correlation: $correlation_id] $message")
end
# Run the listener
start_analysis_listener()

View File

@@ -0,0 +1,54 @@
#!/usr/bin/env node
// Scenario 2: Deep Dive Analysis (Large Arrow Table)
// Tests large Arrow tables (> 1MB) sent via HTTP fileserver
const { SmartSend } = require('../js_bridge');
// Configuration
const ANALYSIS_SUBJECT = "analysis_results";
const NATS_URL = "nats://localhost:4222";
// Create correlation ID for tracing
const correlationId = require('uuid').v4();
// Sender: Send large Arrow table to Julia
async function sendLargeTable() {
// Create a large DataFrame-like structure (10 million rows)
// For testing, we'll create a smaller but still large table
const numRows = 1000000; // 1 million rows
const data = {
id: Array.from({ length: numRows }, (_, i) => i + 1),
value: Array.from({ length: numRows }, () => Math.random()),
category: Array.from({ length: numRows }, () => ['A', 'B', 'C'][Math.floor(Math.random() * 3)])
};
// Convert to Arrow Table
const { Table, Vector, RecordBatch } = require('apache-arrow');
const idVector = Vector.from(data.id);
const valueVector = Vector.from(data.value);
const categoryVector = Vector.from(data.category);
const table = Table.from({
id: idVector,
value: valueVector,
category: categoryVector
});
// Send via SmartSend with type="table"
const env = await SmartSend(
ANALYSIS_SUBJECT,
table,
"table",
{ correlationId }
);
console.log(`Sent large table with ${numRows} rows`);
console.log(`Correlation ID: ${correlationId}`);
console.log(`Transport: ${env.transport}`);
console.log(`URL: ${env.url || 'N/A'}`);
}
// Run the sender
sendLargeTable().catch(console.error);

View File

@@ -0,0 +1,66 @@
#!/usr/bin/env julia
# Scenario 3: Julia-to-Julia Service Communication
# Tests bi-directional communication between two Julia services
using NATS
using Arrow
using DataFrames
using JSON3
using UUIDs
# Include the bridge module
include("../src/julia_bridge.jl")
using .BiDirectionalBridge
# Configuration
const SUBJECT1 = "julia_to_js"
const SUBJECT2 = "js_to_julia"
const RESPONSE_SUBJECT = "response"
const NATS_URL = "nats://localhost:4222"
# Create correlation ID for tracing
correlation_id = string(uuid4())
# Julia-to-Julia Test: Large Arrow Table
function test_julia_to_julia_large_table()
conn = NATS.Connection(NATS_URL)
try
# Subscriber on SUBJECT2 to receive data from Julia sender
NATS.subscribe(conn, SUBJECT2) do msg
log_trace("[$(Dates.now())] Received on $SUBJECT2")
# Use SmartReceive to handle the data
result = SmartReceive(msg)
# Check transport type
if result.envelope.transport == "direct"
log_trace("Received direct transport with $(length(result.data)) bytes")
else
# For link transport, result.data is the URL
log_trace("Received link transport at $(result.data)")
end
# Send response back
response = Dict(
"status" => "Processed",
"correlation_id" => result.envelope.correlation_id,
"timestamp" => Dates.now()
)
NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response))
end
# Keep listening
sleep(5)
finally
NATS.close(conn)
end
end
# Helper: Log with correlation ID
function log_trace(message)
timestamp = Dates.now()
println("[$timestamp] [Correlation: $correlation_id] $message")
end
# Run the test
test_julia_to_julia_large_table()

148
test/scenario_tests.md Normal file
View File

@@ -0,0 +1,148 @@
# Test Scenarios for Bi-Directional Data Bridge
## Scenario 1: Command & Control (Small JSON)
Tests small JSON payloads (< 1MB) sent directly via NATS.
### Julia (Receiver)
```julia
using NATS
using JSON3
# Subscribe to control subject
subscribe(nats, "control") do msg
env = MessageEnvelope(String(msg.data))
# Parse JSON payload
config = JSON3.read(env.payload)
# Execute simulation with parameters
step_size = config.step_size
iterations = config.iterations
# Send acknowledgment
response = Dict("status" => "Running", "correlation_id" => env.correlation_id)
publish(nats, "control_response", JSON3.stringify(response))
end
```
### JavaScript (Sender)
```javascript
const { SmartSend } = require('./js_bridge');
// Create small JSON config
const config = {
step_size: 0.01,
iterations: 1000
};
// Send via SmartSend with type="json"
await SmartSend("control", config, "json");
```
## Scenario 2: Deep Dive Analysis (Large Arrow Table)
Tests large Arrow tables (> 1MB) sent via HTTP fileserver.
### Julia (Sender)
```julia
using Arrow
using DataFrames
# Create large DataFrame (500MB, 10 million rows)
df = DataFrame(
id = 1:10_000_000,
value = rand(10_000_000),
category = rand(["A", "B", "C"], 10_000_000)
)
# Convert to Arrow IPC stream and send
await SmartSend("analysis_results", df, "table");
```
### JavaScript (Receiver)
```javascript
const { SmartReceive } = require('./js_bridge');
// Receive message with URL
const result = await SmartReceive(msg);
// Fetch data from HTTP server
const table = result.data;
// Load into Perspective.js or D3
// Use table data for visualization
```
## Scenario 3: Live Binary Processing
Tests binary data (binary) sent from JS to Julia for FFT/transcription.
### JavaScript (Sender)
```javascript
const { SmartSend } = require('./js_bridge');
// Capture binary chunk (2 seconds, 44.1kHz, 1 channel)
const binaryData = await navigator.mediaDevices.getUserMedia({ binary: true });
// Send as binary with metadata headers
await SmartSend("binary_input", binaryData, "binary", {
metadata: {
sample_rate: 44100,
channels: 1
}
});
```
### Julia (Receiver)
```julia
using WAV
using DSP
# Receive binary data
function process_binary(data)
# Perform FFT or AI transcription
spectrum = fft(data)
# Send results back (JSON + Arrow table)
results = Dict("transcription" => "sample text", "spectrum" => spectrum)
await SmartSend("binary_output", results, "json")
end
```
## Scenario 4: Catch-Up (JetStream)
Tests temporal decoupling with NATS JetStream.
### Julia (Producer)
```julia
# Publish to JetStream
using NATS
function publish_health_status(nats)
jetstream = JetStream(nats, "health_updates")
while true
status = Dict("cpu" => rand(), "memory" => rand())
publish(jetstream, "health", status)
sleep(5) # Every 5 seconds
end
end
```
### JavaScript (Consumer)
```javascript
const { connect } = require('nats');
const nc = await connect({ servers: ['nats://localhost:4222'] });
const js = nc.jetstream();
// Request replay from last 10 minutes
const consumer = await js.pullSubscribe("health", {
durable_name: "catchup",
max_batch: 100,
max_ack_wait: 30000
});
// Process historical and real-time messages
for await (const msg of consumer) {
const result = await SmartReceive(msg);
// Process the data
msg.ack();
}

121
test/test_large_payload.jl Normal file
View File

@@ -0,0 +1,121 @@
#!/usr/bin/env julia
# Test script for large payload testing using binary transport
# Tests sending a large file (> 1MB) via smartsend with binary type
using NATS, JSON, UUIDs, Dates
# Include the bridge module
include("../src/NATSBridge.jl")
using .NATSBridge
# Configuration
const SUBJECT = "/large_binary_test"
const NATS_URL = "nats.yiem.cc"
const FILESERVER_URL = "http://192.168.88.104:8080"
# Create correlation ID for tracing
correlation_id = string(uuid4())
# File path for large binary payload test
const LARGE_FILE_PATH = "./test.zip"
# Helper: Log with correlation ID
function log_trace(message)
timestamp = Dates.now()
println("[$timestamp] [Correlation: $correlation_id] $message")
end
# Sender: Send large binary file via smartsend
function test_large_binary_send()
conn = NATS.connect(NATS_URL)
# Read the large file as binary data
log_trace("Reading large file: $LARGE_FILE_PATH")
file_data = read(LARGE_FILE_PATH)
file_size = length(file_data)
log_trace("File size: $file_size bytes")
# Use smartsend with binary type - will automatically use link transport
# if file size exceeds the threshold (1MB by default)
env = NATSBridge.smartsend(
SUBJECT,
file_data,
"binary",
nats_url = NATS_URL,
fileserver_url = FILESERVER_URL;
dataname="test.zip"
)
log_trace("Sent message with transport: $(env.transport)")
log_trace("Envelope type: $(env.type)")
# Check if link transport was used
if env.transport == "link"
log_trace("Using link transport - file uploaded to HTTP server")
log_trace("URL: $(env.url)")
else
log_trace("Using direct transport - payload sent via NATS")
end
NATS.drain(conn)
end
# Receiver: Listen for messages and verify large payload handling
function test_large_binary_receive()
conn = NATS.connect(NATS_URL)
NATS.subscribe(conn, SUBJECT) do msg
log_trace("Received message on $(msg.subject)")
log_trace("Received message:\n$msg")
# Use SmartReceive to handle the data
result = SmartReceive(msg)
# Check transport type
if result.envelope.transport == "direct"
log_trace("Received direct transport with $(length(result.data)) bytes")
else
# For link transport, result.data is the URL
log_trace("Received link transport at $(result.data)")
end
# Verify the received data matches the original
if result.envelope.type == "binary"
if isa(result.data, Vector{UInt8})
file_size = length(result.data)
log_trace("Received $(file_size) bytes of binary data")
# Save received data to a test file
output_path = "test_output.bin"
write(output_path, result.data)
log_trace("Saved received data to $output_path")
# Verify file size
original_size = length(read(LARGE_FILE_PATH))
if file_size == original_size
log_trace("SUCCESS: File size matches! Original: $original_size bytes")
else
log_trace("WARNING: File size mismatch! Original: $original_size, Received: $file_size")
end
end
end
end
# Keep listening for 10 seconds
sleep(10)
NATS.drain(conn)
end
# Run the test
println("Starting large binary payload test...")
println("Correlation ID: $correlation_id")
println("Large file: $LARGE_FILE_PATH")
# Run sender first
println("start smartsend")
test_large_binary_send()
# Run receiver
println("testing smartreceive")
test_large_binary_receive()
println("Test completed.")