commit 9ecd81c400b0a49d762eafc6e035617ef673f547 Author: narawat Date: Tue Feb 10 06:55:29 2026 +0700 1st commit diff --git a/AI_prompt.txt b/AI_prompt.txt new file mode 100644 index 0000000..8e66928 --- /dev/null +++ b/AI_prompt.txt @@ -0,0 +1,14 @@ +Consider the following scenarios: +Scenario 1: The "Command & Control" Loop (Low Latency)Focus: Small payloads, Core NATS, bi-directional JSON.The Action: A user on a JavaScript dashboard clicks a "Start Simulation" button. This sends a JSON configuration (parameters like step_size and iterations) to Julia.The Flow: * JS (Sender): Recognizes the message is small ($< 10KB$). Packages it as a direct transport JSON envelope.Julia (Receiver): Listens on the NATS subject, decodes the JSON, and immediately acknowledges receipt with a "Running" status.Project Requirement Met: Fast, low-overhead communication for control signals without involving the fileserver. +Scenario 2: The "Deep Dive" Analysis (High Bandwidth)Focus: Large Arrow tables, Claim-Check pattern, Julia to JS.The Action: Julia finishes a heavy computation and produces a 500MB DataFrame with 10 million rows. It needs to send this to the JS frontend for visualization (e.g., using Perspective.js or D3).The Flow:Julia (Sender): Converts the DataFrame to an Arrow IPC stream. It sees the size is $> 1MB$, so it uploads the bytes to the HTTP fileserver. It then publishes a NATS message with transport: "link" and the URL.JS (Receiver): Receives the URL, fetches the data via fetch(), and uses tableFromIPC() to load the data into memory with zero-copy.Project Requirement Met: Handling massive datasets that exceed NATS message limits while maintaining data integrity across languages. +Scenario 3: Live Audio/Signal Processing (Multimedia & Metadata)Focus: Raw binary, bi-directional streaming, NATS Headers.The Action: The JS client captures a 2-second "chunk" of microphone audio. It needs Julia to perform a Fast Fourier Transform (FFT) or AI transcription.The Flow:JS (Sender): Sends the raw binary WAV/PCM data. It uses NATS Headers to store the metadata ($fs = 44.1kHz$, $channels = 1$) to keep the payload purely binary.Julia (Receiver): Processes the audio and sends back a JSON result (the transcription) and an Arrow Table (the frequency spectrum data).Project Requirement Met: Bi-directional flow involving mixed media (Audio) and technical results (Arrow). +Scenario 4: The "Catch-Up" (Persistence & JetStream)Focus: NATS JetStream, late-joining consumers, state sync.The Action: Julia is constantly publishing "System Health" updates. The JS dashboard is closed for 10 minutes. When the user re-opens the dashboard, they need to see the last 10 minutes of history.The Flow:NATS (Server): Uses a JetStream with a Limits retention policy.JS (Consumer): Connects and requests a "Replay" from the last 10 minutes. It receives a mix of direct (small updates) and link (historical snapshots) messages.Project Requirement Met: Temporal decoupling—consumers can receive data that was sent while they were offline. + +Role: Principal Systems Architect & Lead Software Engineer.Objective: Implement a high-performance, bi-directional data bridge between a Julia service and a JavaScript (Node.js) service using NATS (Core & JetStream).⚠️ STRICT ARCHITECTURAL CONSTRAINTS (Non-Negotiable)Transport Strategy (Claim-Check Pattern):Direct Path: If payload is < 1MB, send data directly via NATS inside the message envelope (Base64 encoded).Link Path: If payload is > 1MB, upload to a shared HTTP fileserver/store. The NATS message must only contain the metadata and the download URL.Tabular Data Format: * MUST use Apache Arrow IPC Stream for all tables/DataFrames. No CSV or standard JSON-serialization of tables allowed.System Symmetry: * Both services must function as Producers AND Consumers.Modular Elegance: * Implementation must be abstracted into a SmartSend function and a SmartReceive handler. The developer calling these functions should not need to care if the data is going via NATS direct or HTTP link.Technical Stack & Use CasesJulia: NATS.jl, Arrow.jl, JSON3.jl, HTTP.jl.Node.js: nats.js, apache-arrow.Scenarios to Support: * Large Data: Sending a 500MB Arrow table from Julia $\rightarrow$ JS.Media: Sending a 5MB WAV file from JS $\rightarrow$ Julia.Signals: Sending small JSON control commands ($< 10KB$) directly via NATS.Implementation Requirements1. Unified JSON Envelope:Define a schema containing: correlation_id (UUID), type (table/binary/json), transport (direct/link), payload (if direct), and url (if link).2. The Julia Module:Implement SmartSend(subject, data, type): Handles Arrow serialization to an IOBuffer, checks size, and manages HTTP uploads for large blobs.Implement SmartReceive(msg): Parses envelope, handles the HTTP fetch with Exponential Backoff (to avoid race conditions), and restores the DataFrame.Include a basic HTTP.listen server to serve as the temporary storage.3. The JavaScript Module:Implement a symmetric SmartSend using nats.js and apache-arrow.Implement a JetStream Pull Consumer for SmartReceive to ensure backpressure and memory safety.4. Performance & Reliability:Demonstrate "Zero-Copy" reading of the Arrow IPC stream on the JS side.Log the correlation_id at every stage for distributed tracing. + + + + + + + diff --git a/Manifest.toml b/Manifest.toml new file mode 100644 index 0000000..1e89673 --- /dev/null +++ b/Manifest.toml @@ -0,0 +1,831 @@ +# This file is machine-generated - editing it directly is not advised + +julia_version = "1.12.4" +manifest_format = "2.0" +project_hash = "be1e3c2d8b7f4f0ee7375c94aaf704ce73ba57b9" + +[[deps.AliasTables]] +deps = ["PtrArrays", "Random"] +git-tree-sha1 = "9876e1e164b144ca45e9e3198d0b689cadfed9ff" +uuid = "66dad0bd-aa9a-41b7-9441-69ab47430ed8" +version = "1.1.3" + +[[deps.ArgTools]] +uuid = "0dad84c5-d112-42e6-8d28-ef12dabb789f" +version = "1.1.2" + +[[deps.Arrow]] +deps = ["ArrowTypes", "BitIntegers", "CodecLz4", "CodecZstd", "ConcurrentUtilities", "DataAPI", "Dates", "EnumX", "Mmap", "PooledArrays", "SentinelArrays", "StringViews", "Tables", "TimeZones", "TranscodingStreams", "UUIDs"] +git-tree-sha1 = "4a69a3eadc1f7da78d950d1ef270c3a62c1f7e01" +uuid = "69666777-d1a9-59fb-9406-91d4454c9d45" +version = "2.8.1" + +[[deps.ArrowTypes]] +deps = ["Sockets", "UUIDs"] +git-tree-sha1 = "404265cd8128a2515a81d5eae16de90fdef05101" +uuid = "31f734f8-188a-4ce0-8406-c8a06bd891cd" +version = "2.3.0" + +[[deps.Artifacts]] +uuid = "56f22d72-fd6d-98f1-02f0-08ddc0907c33" +version = "1.11.0" + +[[deps.Base64]] +uuid = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" +version = "1.11.0" + +[[deps.BitFlags]] +git-tree-sha1 = "0691e34b3bb8be9307330f88d1a3c3f25466c24d" +uuid = "d1d4a3ce-64b1-5f1a-9ba4-7e7e69966f35" +version = "0.1.9" + +[[deps.BitIntegers]] +deps = ["Random"] +git-tree-sha1 = "091d591a060e43df1dd35faab3ca284925c48e46" +uuid = "c3b6d118-76ef-56ca-8cc7-ebb389d030a1" +version = "0.3.7" + +[[deps.BufferedStreams]] +git-tree-sha1 = "6863c5b7fc997eadcabdbaf6c5f201dc30032643" +uuid = "e1450e63-4bb3-523b-b2a4-4ffa8c0fd77d" +version = "1.2.2" + +[[deps.CSV]] +deps = ["CodecZlib", "Dates", "FilePathsBase", "InlineStrings", "Mmap", "Parsers", "PooledArrays", "PrecompileTools", "SentinelArrays", "Tables", "Unicode", "WeakRefStrings", "WorkerUtilities"] +git-tree-sha1 = "deddd8725e5e1cc49ee205a1964256043720a6c3" +uuid = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" +version = "0.10.15" + +[[deps.CodeTracking]] +deps = ["InteractiveUtils", "UUIDs"] +git-tree-sha1 = "b7231a755812695b8046e8471ddc34c8268cbad5" +uuid = "da1fd8a2-8d9e-5ec2-8556-3022fb5608a2" +version = "3.0.0" + +[[deps.CodecBase]] +deps = ["TranscodingStreams"] +git-tree-sha1 = "40956acdbef3d8c7cc38cba42b56034af8f8581a" +uuid = "6c391c72-fb7b-5838-ba82-7cfb1bcfecbf" +version = "0.3.4" + +[[deps.CodecLz4]] +deps = ["Lz4_jll", "TranscodingStreams"] +git-tree-sha1 = "d58afcd2833601636b48ee8cbeb2edcb086522c2" +uuid = "5ba52731-8f18-5e0d-9241-30f10d1ec561" +version = "0.4.6" + +[[deps.CodecZlib]] +deps = ["TranscodingStreams", "Zlib_jll"] +git-tree-sha1 = "962834c22b66e32aa10f7611c08c8ca4e20749a9" +uuid = "944b1d66-785c-5afd-91f1-9de20f533193" +version = "0.7.8" + +[[deps.CodecZstd]] +deps = ["TranscodingStreams", "Zstd_jll"] +git-tree-sha1 = "da54a6cd93c54950c15adf1d336cfd7d71f51a56" +uuid = "6b39b394-51ab-5f42-8807-6242bab2b4c2" +version = "0.8.7" + +[[deps.Compat]] +deps = ["TOML", "UUIDs"] +git-tree-sha1 = "9d8a54ce4b17aa5bdce0ea5c34bc5e7c340d16ad" +uuid = "34da2185-b29b-5c13-b0c7-acf172513d20" +version = "4.18.1" +weakdeps = ["Dates", "LinearAlgebra"] + + [deps.Compat.extensions] + CompatLinearAlgebraExt = "LinearAlgebra" + +[[deps.Compiler]] +git-tree-sha1 = "382d79bfe72a406294faca39ef0c3cef6e6ce1f1" +uuid = "807dbc54-b67e-4c79-8afb-eafe4df6f2e1" +version = "0.1.1" + +[[deps.CompilerSupportLibraries_jll]] +deps = ["Artifacts", "Libdl"] +uuid = "e66e0078-7015-5450-92f7-15fbd957f2ae" +version = "1.3.0+1" + +[[deps.ConcurrentUtilities]] +deps = ["Serialization", "Sockets"] +git-tree-sha1 = "d9d26935a0bcffc87d2613ce14c527c99fc543fd" +uuid = "f0e56b4a-5159-44fe-b623-3e5288b988bb" +version = "2.5.0" + +[[deps.Crayons]] +git-tree-sha1 = "249fe38abf76d48563e2f4556bebd215aa317e15" +uuid = "a8cc5b0e-0ffa-5ad4-8c14-923d3ee1735f" +version = "4.1.1" + +[[deps.DataAPI]] +git-tree-sha1 = "abe83f3a2f1b857aac70ef8b269080af17764bbe" +uuid = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a" +version = "1.16.0" + +[[deps.DataFrames]] +deps = ["Compat", "DataAPI", "DataStructures", "Future", "InlineStrings", "InvertedIndices", "IteratorInterfaceExtensions", "LinearAlgebra", "Markdown", "Missings", "PooledArrays", "PrecompileTools", "PrettyTables", "Printf", "Random", "Reexport", "SentinelArrays", "SortingAlgorithms", "Statistics", "TableTraits", "Tables", "Unicode"] +git-tree-sha1 = "d8928e9169ff76c6281f39a659f9bca3a573f24c" +uuid = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" +version = "1.8.1" + +[[deps.DataStructures]] +deps = ["OrderedCollections"] +git-tree-sha1 = "e357641bb3e0638d353c4b29ea0e40ea644066a6" +uuid = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" +version = "0.19.3" + +[[deps.DataValueInterfaces]] +git-tree-sha1 = "bfc1187b79289637fa0ef6d4436ebdfe6905cbd6" +uuid = "e2d170a0-9d28-54be-80f0-106bbe20a464" +version = "1.0.0" + +[[deps.Dates]] +deps = ["Printf"] +uuid = "ade2ca70-3891-5945-98fb-dc099432e06a" +version = "1.11.0" + +[[deps.Distributions]] +deps = ["AliasTables", "FillArrays", "LinearAlgebra", "PDMats", "Printf", "QuadGK", "Random", "SpecialFunctions", "Statistics", "StatsAPI", "StatsBase", "StatsFuns"] +git-tree-sha1 = "fbcc7610f6d8348428f722ecbe0e6cfe22e672c6" +uuid = "31c24e10-a181-5473-b8eb-7969acd0382f" +version = "0.25.123" + + [deps.Distributions.extensions] + DistributionsChainRulesCoreExt = "ChainRulesCore" + DistributionsDensityInterfaceExt = "DensityInterface" + DistributionsTestExt = "Test" + + [deps.Distributions.weakdeps] + ChainRulesCore = "d360d2e6-b24c-11e9-a2a3-2a2ae2dbcce4" + DensityInterface = "b429d917-457f-4dbc-8f4c-0cc954292b1d" + Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" + +[[deps.DocStringExtensions]] +git-tree-sha1 = "7442a5dfe1ebb773c29cc2962a8980f47221d76c" +uuid = "ffbed154-4ef7-542d-bbb7-c09d3a79fcae" +version = "0.9.5" + +[[deps.Downloads]] +deps = ["ArgTools", "FileWatching", "LibCURL", "NetworkOptions"] +uuid = "f43a241f-c20a-4ad4-852c-f6b1247861c6" +version = "1.7.0" + +[[deps.EnumX]] +git-tree-sha1 = "7bebc8aad6ee6217c78c5ddcf7ed289d65d0263e" +uuid = "4e289a0a-7415-4d19-859d-a7e5c4648b56" +version = "1.0.6" + +[[deps.ExceptionUnwrapping]] +deps = ["Test"] +git-tree-sha1 = "d36f682e590a83d63d1c7dbd287573764682d12a" +uuid = "460bff9d-24e4-43bc-9d9f-a8973cb893f4" +version = "0.1.11" + +[[deps.ExprTools]] +git-tree-sha1 = "27415f162e6028e81c72b82ef756bf321213b6ec" +uuid = "e2ba6199-217a-4e67-a87a-7c52f15ade04" +version = "0.1.10" + +[[deps.FilePathsBase]] +deps = ["Compat", "Dates"] +git-tree-sha1 = "3bab2c5aa25e7840a4b065805c0cdfc01f3068d2" +uuid = "48062228-2e41-5def-b9a4-89aafe57970f" +version = "0.9.24" +weakdeps = ["Mmap", "Test"] + + [deps.FilePathsBase.extensions] + FilePathsBaseMmapExt = "Mmap" + FilePathsBaseTestExt = "Test" + +[[deps.FileWatching]] +uuid = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee" +version = "1.11.0" + +[[deps.FillArrays]] +deps = ["LinearAlgebra"] +git-tree-sha1 = "2f979084d1e13948a3352cf64a25df6bd3b4dca3" +uuid = "1a297f60-69ca-5386-bcde-b61e274b549b" +version = "1.16.0" + + [deps.FillArrays.extensions] + FillArraysPDMatsExt = "PDMats" + FillArraysSparseArraysExt = "SparseArrays" + FillArraysStaticArraysExt = "StaticArrays" + FillArraysStatisticsExt = "Statistics" + + [deps.FillArrays.weakdeps] + PDMats = "90014a1f-27ba-587c-ab20-58faa44d9150" + SparseArrays = "2f01184e-e22b-5df5-ae63-d93ebab69eaf" + StaticArrays = "90137ffa-7385-5640-81b9-e52037218182" + Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" + +[[deps.Future]] +deps = ["Random"] +uuid = "9fa8497b-333b-5362-9e8d-4d0656e87820" +version = "1.11.0" + +[[deps.GeneralUtils]] +deps = ["CSV", "DataFrames", "DataStructures", "Dates", "Distributions", "JSON", "NATS", "PrettyPrinting", "Random", "Revise", "SHA", "UUIDs"] +git-tree-sha1 = "e28ca4df47d0c46d04716422bef6adb660f33dc3" +repo-rev = "main" +repo-url = "https://git.yiem.cc/ton/GeneralUtils" +uuid = "c6c72f09-b708-4ac8-ac7c-2084d70108fe" +version = "0.3.1" + +[[deps.HTTP]] +deps = ["Base64", "CodecZlib", "ConcurrentUtilities", "Dates", "ExceptionUnwrapping", "Logging", "LoggingExtras", "MbedTLS", "NetworkOptions", "OpenSSL", "PrecompileTools", "Random", "SimpleBufferStream", "Sockets", "URIs", "UUIDs"] +git-tree-sha1 = "5e6fe50ae7f23d171f44e311c2960294aaa0beb5" +uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3" +version = "1.10.19" + +[[deps.HashArrayMappedTries]] +git-tree-sha1 = "2eaa69a7cab70a52b9687c8bf950a5a93ec895ae" +uuid = "076d061b-32b6-4027-95e0-9a2c6f6d7e74" +version = "0.2.0" + +[[deps.HypergeometricFunctions]] +deps = ["LinearAlgebra", "OpenLibm_jll", "SpecialFunctions"] +git-tree-sha1 = "68c173f4f449de5b438ee67ed0c9c748dc31a2ec" +uuid = "34004b35-14d8-5ef3-9330-4cdb6864b03a" +version = "0.3.28" + +[[deps.InlineStrings]] +git-tree-sha1 = "8f3d257792a522b4601c24a577954b0a8cd7334d" +uuid = "842dd82b-1e85-43dc-bf29-5d0ee9dffc48" +version = "1.4.5" +weakdeps = ["ArrowTypes", "Parsers"] + + [deps.InlineStrings.extensions] + ArrowTypesExt = "ArrowTypes" + ParsersExt = "Parsers" + +[[deps.InteractiveUtils]] +deps = ["Markdown"] +uuid = "b77e0a4c-d291-57a0-90e8-8db25a27a240" +version = "1.11.0" + +[[deps.InvertedIndices]] +git-tree-sha1 = "6da3c4316095de0f5ee2ebd875df8721e7e0bdbe" +uuid = "41ab1584-1d38-5bbf-9106-f11c6c58b48f" +version = "1.3.1" + +[[deps.IrrationalConstants]] +git-tree-sha1 = "b2d91fe939cae05960e760110b328288867b5758" +uuid = "92d709cd-6900-40b7-9082-c6be49f344b6" +version = "0.2.6" + +[[deps.IteratorInterfaceExtensions]] +git-tree-sha1 = "a3f24677c21f5bbe9d2a714f95dcd58337fb2856" +uuid = "82899510-4779-5014-852e-03e436cf321d" +version = "1.0.0" + +[[deps.JLLWrappers]] +deps = ["Artifacts", "Preferences"] +git-tree-sha1 = "0533e564aae234aff59ab625543145446d8b6ec2" +uuid = "692b3bcd-3c85-4b1f-b108-f13ce0eb3210" +version = "1.7.1" + +[[deps.JSON]] +deps = ["Dates", "Logging", "Parsers", "PrecompileTools", "StructUtils", "UUIDs", "Unicode"] +git-tree-sha1 = "b3ad4a0255688dcb895a52fafbaae3023b588a90" +uuid = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" +version = "1.4.0" +weakdeps = ["ArrowTypes"] + + [deps.JSON.extensions] + JSONArrowExt = ["ArrowTypes"] + +[[deps.JSON3]] +deps = ["Dates", "Mmap", "Parsers", "PrecompileTools", "StructTypes", "UUIDs"] +git-tree-sha1 = "411eccfe8aba0814ffa0fdf4860913ed09c34975" +uuid = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +version = "1.14.3" +weakdeps = ["ArrowTypes"] + + [deps.JSON3.extensions] + JSON3ArrowExt = ["ArrowTypes"] + +[[deps.JuliaInterpreter]] +deps = ["CodeTracking", "InteractiveUtils", "Random", "UUIDs"] +git-tree-sha1 = "80580012d4ed5a3e8b18c7cd86cebe4b816d17a6" +uuid = "aa1ae85d-cabe-5617-a682-6adf51b2e16a" +version = "0.10.9" + +[[deps.JuliaSyntaxHighlighting]] +deps = ["StyledStrings"] +uuid = "ac6e5ff7-fb65-4e79-a425-ec3bc9c03011" +version = "1.12.0" + +[[deps.LaTeXStrings]] +git-tree-sha1 = "dda21b8cbd6a6c40d9d02a73230f9d70fed6918c" +uuid = "b964fa9f-0449-5b57-a5c2-d3ea65f4040f" +version = "1.4.0" + +[[deps.LibCURL]] +deps = ["LibCURL_jll", "MozillaCACerts_jll"] +uuid = "b27032c2-a3e7-50c8-80cd-2d36dbcbfd21" +version = "0.6.4" + +[[deps.LibCURL_jll]] +deps = ["Artifacts", "LibSSH2_jll", "Libdl", "OpenSSL_jll", "Zlib_jll", "nghttp2_jll"] +uuid = "deac9b47-8bc7-5906-a0fe-35ac56dc84c0" +version = "8.15.0+0" + +[[deps.LibGit2]] +deps = ["LibGit2_jll", "NetworkOptions", "Printf", "SHA"] +uuid = "76f85450-5226-5b5a-8eaa-529ad045b433" +version = "1.11.0" + +[[deps.LibGit2_jll]] +deps = ["Artifacts", "LibSSH2_jll", "Libdl", "OpenSSL_jll"] +uuid = "e37daf67-58a4-590a-8e99-b0245dd2ffc5" +version = "1.9.0+0" + +[[deps.LibSSH2_jll]] +deps = ["Artifacts", "Libdl", "OpenSSL_jll"] +uuid = "29816b5a-b9ab-546f-933c-edad1886dfa8" +version = "1.11.3+1" + +[[deps.Libdl]] +uuid = "8f399da3-3557-5675-b5ff-fb832c97cbdb" +version = "1.11.0" + +[[deps.LinearAlgebra]] +deps = ["Libdl", "OpenBLAS_jll", "libblastrampoline_jll"] +uuid = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" +version = "1.12.0" + +[[deps.LogExpFunctions]] +deps = ["DocStringExtensions", "IrrationalConstants", "LinearAlgebra"] +git-tree-sha1 = "13ca9e2586b89836fd20cccf56e57e2b9ae7f38f" +uuid = "2ab3a3ac-af41-5b50-aa03-7779005ae688" +version = "0.3.29" + + [deps.LogExpFunctions.extensions] + LogExpFunctionsChainRulesCoreExt = "ChainRulesCore" + LogExpFunctionsChangesOfVariablesExt = "ChangesOfVariables" + LogExpFunctionsInverseFunctionsExt = "InverseFunctions" + + [deps.LogExpFunctions.weakdeps] + ChainRulesCore = "d360d2e6-b24c-11e9-a2a3-2a2ae2dbcce4" + ChangesOfVariables = "9e997f8a-9a97-42d5-a9f1-ce6bfc15e2c0" + InverseFunctions = "3587e190-3f89-42d0-90ee-14403ec27112" + +[[deps.Logging]] +uuid = "56ddb016-857b-54e1-b83d-db4d58db5568" +version = "1.11.0" + +[[deps.LoggingExtras]] +deps = ["Dates", "Logging"] +git-tree-sha1 = "f00544d95982ea270145636c181ceda21c4e2575" +uuid = "e6f89c97-d47a-5376-807f-9c37f3926c36" +version = "1.2.0" + +[[deps.LoweredCodeUtils]] +deps = ["CodeTracking", "Compiler", "JuliaInterpreter"] +git-tree-sha1 = "65ae3db6ab0e5b1b5f217043c558d9d1d33cc88d" +uuid = "6f1432cf-f94c-5a45-995e-cdbf5db27b0b" +version = "3.5.0" + +[[deps.Lz4_jll]] +deps = ["Artifacts", "JLLWrappers", "Libdl"] +git-tree-sha1 = "191686b1ac1ea9c89fc52e996ad15d1d241d1e33" +uuid = "5ced341a-0733-55b8-9ab6-a4889d929147" +version = "1.10.1+0" + +[[deps.Markdown]] +deps = ["Base64", "JuliaSyntaxHighlighting", "StyledStrings"] +uuid = "d6f4376e-aef5-505a-96c1-9c027394607a" +version = "1.11.0" + +[[deps.MbedTLS]] +deps = ["Dates", "MbedTLS_jll", "MozillaCACerts_jll", "NetworkOptions", "Random", "Sockets"] +git-tree-sha1 = "c067a280ddc25f196b5e7df3877c6b226d390aaf" +uuid = "739be429-bea8-5141-9913-cc70e7f3736d" +version = "1.1.9" + +[[deps.MbedTLS_jll]] +deps = ["Artifacts", "JLLWrappers", "Libdl"] +git-tree-sha1 = "ff69a2b1330bcb730b9ac1ab7dd680176f5896b8" +uuid = "c8ffd9c3-330d-5841-b78e-0817d7145fa1" +version = "2.28.1010+0" + +[[deps.Missings]] +deps = ["DataAPI"] +git-tree-sha1 = "ec4f7fbeab05d7747bdf98eb74d130a2a2ed298d" +uuid = "e1d29d7a-bbdc-5cf2-9ac0-f12de2c33e28" +version = "1.2.0" + +[[deps.Mmap]] +uuid = "a63ad114-7e13-5084-954f-fe012c677804" +version = "1.11.0" + +[[deps.Mocking]] +deps = ["Compat", "ExprTools"] +git-tree-sha1 = "2c140d60d7cb82badf06d8783800d0bcd1a7daa2" +uuid = "78c3b35d-d492-501b-9361-3d52fe80e533" +version = "0.8.1" + +[[deps.MozillaCACerts_jll]] +uuid = "14a3606d-f60d-562e-9121-12d972cd8159" +version = "2025.11.4" + +[[deps.NATS]] +deps = ["Base64", "BufferedStreams", "CodecBase", "Dates", "DocStringExtensions", "JSON3", "MbedTLS", "NanoDates", "Random", "ScopedValues", "Sockets", "Sodium", "StructTypes", "URIs"] +git-tree-sha1 = "d9d9a189fb9155a460e6b5e8966bf6a66737abf8" +uuid = "55e73f9c-eeeb-467f-b4cc-a633fde63d2a" +version = "0.1.0" + +[[deps.NanoDates]] +deps = ["Dates", "Parsers"] +git-tree-sha1 = "850a0557ae5934f6e67ac0dc5ca13d0328422d1f" +uuid = "46f1a544-deae-4307-8689-c12aa3c955c6" +version = "1.0.3" + +[[deps.NetworkOptions]] +uuid = "ca575930-c2e3-43a9-ace4-1e988b2c1908" +version = "1.3.0" + +[[deps.OpenBLAS_jll]] +deps = ["Artifacts", "CompilerSupportLibraries_jll", "Libdl"] +uuid = "4536629a-c528-5b80-bd46-f80d51c5b363" +version = "0.3.29+0" + +[[deps.OpenLibm_jll]] +deps = ["Artifacts", "Libdl"] +uuid = "05823500-19ac-5b8b-9628-191a04bc5112" +version = "0.8.7+0" + +[[deps.OpenSSL]] +deps = ["BitFlags", "Dates", "MozillaCACerts_jll", "NetworkOptions", "OpenSSL_jll", "Sockets"] +git-tree-sha1 = "1d1aaa7d449b58415f97d2839c318b70ffb525a0" +uuid = "4d8831e6-92b7-49fb-bdf8-b643e874388c" +version = "1.6.1" + +[[deps.OpenSSL_jll]] +deps = ["Artifacts", "Libdl"] +uuid = "458c3c95-2e84-50aa-8efc-19380b2a3a95" +version = "3.5.4+0" + +[[deps.OpenSpecFun_jll]] +deps = ["Artifacts", "CompilerSupportLibraries_jll", "JLLWrappers", "Libdl"] +git-tree-sha1 = "1346c9208249809840c91b26703912dff463d335" +uuid = "efe28fd5-8261-553b-a9e1-b2916fc3738e" +version = "0.5.6+0" + +[[deps.OrderedCollections]] +git-tree-sha1 = "05868e21324cede2207c6f0f466b4bfef6d5e7ee" +uuid = "bac558e1-5e72-5ebc-8fee-abe8a469f55d" +version = "1.8.1" + +[[deps.PDMats]] +deps = ["LinearAlgebra", "SparseArrays", "SuiteSparse"] +git-tree-sha1 = "e4cff168707d441cd6bf3ff7e4832bdf34278e4a" +uuid = "90014a1f-27ba-587c-ab20-58faa44d9150" +version = "0.11.37" +weakdeps = ["StatsBase"] + + [deps.PDMats.extensions] + StatsBaseExt = "StatsBase" + +[[deps.Parsers]] +deps = ["Dates", "PrecompileTools", "UUIDs"] +git-tree-sha1 = "7d2f8f21da5db6a806faf7b9b292296da42b2810" +uuid = "69de0a69-1ddd-5017-9359-2bf0b02dc9f0" +version = "2.8.3" + +[[deps.PooledArrays]] +deps = ["DataAPI", "Future"] +git-tree-sha1 = "36d8b4b899628fb92c2749eb488d884a926614d3" +uuid = "2dfb63ee-cc39-5dd5-95bd-886bf059d720" +version = "1.4.3" + +[[deps.PrecompileTools]] +deps = ["Preferences"] +git-tree-sha1 = "07a921781cab75691315adc645096ed5e370cb77" +uuid = "aea7be01-6a6a-4083-8856-8a6e6704d82a" +version = "1.3.3" + +[[deps.Preferences]] +deps = ["TOML"] +git-tree-sha1 = "522f093a29b31a93e34eaea17ba055d850edea28" +uuid = "21216c6a-2e73-6563-6e65-726566657250" +version = "1.5.1" + +[[deps.PrettyPrinting]] +git-tree-sha1 = "142ee93724a9c5d04d78df7006670a93ed1b244e" +uuid = "54e16d92-306c-5ea0-a30b-337be88ac337" +version = "0.4.2" + +[[deps.PrettyTables]] +deps = ["Crayons", "LaTeXStrings", "Markdown", "PrecompileTools", "Printf", "REPL", "Reexport", "StringManipulation", "Tables"] +git-tree-sha1 = "c5a07210bd060d6a8491b0ccdee2fa0235fc00bf" +uuid = "08abe8d2-0d0c-5749-adfa-8a2ac140af0d" +version = "3.1.2" + +[[deps.Printf]] +deps = ["Unicode"] +uuid = "de0858da-6303-5e67-8744-51eddeeeb8d7" +version = "1.11.0" + +[[deps.PtrArrays]] +git-tree-sha1 = "1d36ef11a9aaf1e8b74dacc6a731dd1de8fd493d" +uuid = "43287f4e-b6f4-7ad1-bb20-aadabca52c3d" +version = "1.3.0" + +[[deps.QuadGK]] +deps = ["DataStructures", "LinearAlgebra"] +git-tree-sha1 = "9da16da70037ba9d701192e27befedefb91ec284" +uuid = "1fd47b50-473d-5c70-9696-f719f8f3bcdc" +version = "2.11.2" + + [deps.QuadGK.extensions] + QuadGKEnzymeExt = "Enzyme" + + [deps.QuadGK.weakdeps] + Enzyme = "7da242da-08ed-463a-9acd-ee780be4f1d9" + +[[deps.REPL]] +deps = ["InteractiveUtils", "JuliaSyntaxHighlighting", "Markdown", "Sockets", "StyledStrings", "Unicode"] +uuid = "3fa0cd96-eef1-5676-8a61-b3b8758bbffb" +version = "1.11.0" + +[[deps.Random]] +deps = ["SHA"] +uuid = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" +version = "1.11.0" + +[[deps.Reexport]] +git-tree-sha1 = "45e428421666073eab6f2da5c9d310d99bb12f9b" +uuid = "189a3867-3050-52da-a836-e630ba90ab69" +version = "1.2.2" + +[[deps.Revise]] +deps = ["CodeTracking", "FileWatching", "InteractiveUtils", "JuliaInterpreter", "LibGit2", "LoweredCodeUtils", "OrderedCollections", "Preferences", "REPL", "UUIDs"] +git-tree-sha1 = "14d1bfb0a30317edc77e11094607ace3c800f193" +uuid = "295af30f-e4ad-537b-8983-00126c2a3abe" +version = "3.13.2" + + [deps.Revise.extensions] + DistributedExt = "Distributed" + + [deps.Revise.weakdeps] + Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" + +[[deps.Rmath]] +deps = ["Random", "Rmath_jll"] +git-tree-sha1 = "5b3d50eb374cea306873b371d3f8d3915a018f0b" +uuid = "79098fc4-a85e-5d69-aa6a-4863f24498fa" +version = "0.9.0" + +[[deps.Rmath_jll]] +deps = ["Artifacts", "JLLWrappers", "Libdl"] +git-tree-sha1 = "58cdd8fb2201a6267e1db87ff148dd6c1dbd8ad8" +uuid = "f50d1b31-88e8-58de-be2c-1cc44531875f" +version = "0.5.1+0" + +[[deps.SHA]] +uuid = "ea8e919c-243c-51af-8825-aaa63cd721ce" +version = "0.7.0" + +[[deps.ScopedValues]] +deps = ["HashArrayMappedTries", "Logging"] +git-tree-sha1 = "c3b2323466378a2ba15bea4b2f73b081e022f473" +uuid = "7e506255-f358-4e82-b7e4-beb19740aa63" +version = "1.5.0" + +[[deps.Scratch]] +deps = ["Dates"] +git-tree-sha1 = "9b81b8393e50b7d4e6d0a9f14e192294d3b7c109" +uuid = "6c6a2e73-6563-6170-7368-637461726353" +version = "1.3.0" + +[[deps.SentinelArrays]] +deps = ["Dates", "Random"] +git-tree-sha1 = "ebe7e59b37c400f694f52b58c93d26201387da70" +uuid = "91c51154-3ec4-41a3-a24f-3f23e20d615c" +version = "1.4.9" + +[[deps.Serialization]] +uuid = "9e88b42a-f829-5b0c-bbe9-9e923198166b" +version = "1.11.0" + +[[deps.SimpleBufferStream]] +git-tree-sha1 = "f305871d2f381d21527c770d4788c06c097c9bc1" +uuid = "777ac1f9-54b0-4bf8-805c-2214025038e7" +version = "1.2.0" + +[[deps.Sockets]] +uuid = "6462fe0b-24de-5631-8697-dd941f90decc" +version = "1.11.0" + +[[deps.Sodium]] +deps = ["Base64", "libsodium_jll"] +git-tree-sha1 = "907703e0d50846f300650d7225bdcab145b7bca9" +uuid = "4f5b5e99-b0ad-42cd-b47a-334e172ec8bd" +version = "1.1.2" + +[[deps.SortingAlgorithms]] +deps = ["DataStructures"] +git-tree-sha1 = "64d974c2e6fdf07f8155b5b2ca2ffa9069b608d9" +uuid = "a2af1166-a08f-5f64-846c-94a0d3cef48c" +version = "1.2.2" + +[[deps.SparseArrays]] +deps = ["Libdl", "LinearAlgebra", "Random", "Serialization", "SuiteSparse_jll"] +uuid = "2f01184e-e22b-5df5-ae63-d93ebab69eaf" +version = "1.12.0" + +[[deps.SpecialFunctions]] +deps = ["IrrationalConstants", "LogExpFunctions", "OpenLibm_jll", "OpenSpecFun_jll"] +git-tree-sha1 = "f2685b435df2613e25fc10ad8c26dddb8640f547" +uuid = "276daf66-3868-5448-9aa4-cd146d93841b" +version = "2.6.1" + + [deps.SpecialFunctions.extensions] + SpecialFunctionsChainRulesCoreExt = "ChainRulesCore" + + [deps.SpecialFunctions.weakdeps] + ChainRulesCore = "d360d2e6-b24c-11e9-a2a3-2a2ae2dbcce4" + +[[deps.Statistics]] +deps = ["LinearAlgebra"] +git-tree-sha1 = "ae3bb1eb3bba077cd276bc5cfc337cc65c3075c0" +uuid = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" +version = "1.11.1" +weakdeps = ["SparseArrays"] + + [deps.Statistics.extensions] + SparseArraysExt = ["SparseArrays"] + +[[deps.StatsAPI]] +deps = ["LinearAlgebra"] +git-tree-sha1 = "178ed29fd5b2a2cfc3bd31c13375ae925623ff36" +uuid = "82ae8749-77ed-4fe6-ae5f-f523153014b0" +version = "1.8.0" + +[[deps.StatsBase]] +deps = ["AliasTables", "DataAPI", "DataStructures", "IrrationalConstants", "LinearAlgebra", "LogExpFunctions", "Missings", "Printf", "Random", "SortingAlgorithms", "SparseArrays", "Statistics", "StatsAPI"] +git-tree-sha1 = "aceda6f4e598d331548e04cc6b2124a6148138e3" +uuid = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91" +version = "0.34.10" + +[[deps.StatsFuns]] +deps = ["HypergeometricFunctions", "IrrationalConstants", "LogExpFunctions", "Reexport", "Rmath", "SpecialFunctions"] +git-tree-sha1 = "91f091a8716a6bb38417a6e6f274602a19aaa685" +uuid = "4c63d2b9-4356-54db-8cca-17b64c39e42c" +version = "1.5.2" + + [deps.StatsFuns.extensions] + StatsFunsChainRulesCoreExt = "ChainRulesCore" + StatsFunsInverseFunctionsExt = "InverseFunctions" + + [deps.StatsFuns.weakdeps] + ChainRulesCore = "d360d2e6-b24c-11e9-a2a3-2a2ae2dbcce4" + InverseFunctions = "3587e190-3f89-42d0-90ee-14403ec27112" + +[[deps.StringManipulation]] +deps = ["PrecompileTools"] +git-tree-sha1 = "a3c1536470bf8c5e02096ad4853606d7c8f62721" +uuid = "892a3eda-7b42-436c-8928-eab12a02cf0e" +version = "0.4.2" + +[[deps.StringViews]] +git-tree-sha1 = "f2dcb92855b31ad92fe8f079d4f75ac57c93e4b8" +uuid = "354b36f9-a18e-4713-926e-db85100087ba" +version = "1.3.7" + +[[deps.StructTypes]] +deps = ["Dates", "UUIDs"] +git-tree-sha1 = "159331b30e94d7b11379037feeb9b690950cace8" +uuid = "856f2bd8-1eba-4b0a-8007-ebc267875bd4" +version = "1.11.0" + +[[deps.StructUtils]] +deps = ["Dates", "UUIDs"] +git-tree-sha1 = "9297459be9e338e546f5c4bedb59b3b5674da7f1" +uuid = "ec057cc2-7a8d-4b58-b3b3-92acb9f63b42" +version = "2.6.2" + + [deps.StructUtils.extensions] + StructUtilsMeasurementsExt = ["Measurements"] + StructUtilsTablesExt = ["Tables"] + + [deps.StructUtils.weakdeps] + Measurements = "eff96d63-e80a-5855-80a2-b1b0885c5ab7" + Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" + +[[deps.StyledStrings]] +uuid = "f489334b-da3d-4c2e-b8f0-e476e12c162b" +version = "1.11.0" + +[[deps.SuiteSparse]] +deps = ["Libdl", "LinearAlgebra", "Serialization", "SparseArrays"] +uuid = "4607b0f0-06f3-5cda-b6b1-a6196a1729e9" + +[[deps.SuiteSparse_jll]] +deps = ["Artifacts", "Libdl", "libblastrampoline_jll"] +uuid = "bea87d4a-7f5b-5778-9afe-8cc45184846c" +version = "7.8.3+2" + +[[deps.TOML]] +deps = ["Dates"] +uuid = "fa267f1f-6049-4f14-aa54-33bafae1ed76" +version = "1.0.3" + +[[deps.TZJData]] +deps = ["Artifacts"] +git-tree-sha1 = "72df96b3a595b7aab1e101eb07d2a435963a97e2" +uuid = "dc5dba14-91b3-4cab-a142-028a31da12f7" +version = "1.5.0+2025b" + +[[deps.TableTraits]] +deps = ["IteratorInterfaceExtensions"] +git-tree-sha1 = "c06b2f539df1c6efa794486abfb6ed2022561a39" +uuid = "3783bdb8-4a98-5b6b-af9a-565f29a5fe9c" +version = "1.0.1" + +[[deps.Tables]] +deps = ["DataAPI", "DataValueInterfaces", "IteratorInterfaceExtensions", "OrderedCollections", "TableTraits"] +git-tree-sha1 = "f2c1efbc8f3a609aadf318094f8fc5204bdaf344" +uuid = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" +version = "1.12.1" + +[[deps.Test]] +deps = ["InteractiveUtils", "Logging", "Random", "Serialization"] +uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40" +version = "1.11.0" + +[[deps.TimeZones]] +deps = ["Artifacts", "Dates", "Downloads", "InlineStrings", "Mocking", "Printf", "Scratch", "TZJData", "Unicode", "p7zip_jll"] +git-tree-sha1 = "d422301b2a1e294e3e4214061e44f338cafe18a2" +uuid = "f269a46b-ccf7-5d73-abea-4c690281aa53" +version = "1.22.2" + + [deps.TimeZones.extensions] + TimeZonesRecipesBaseExt = "RecipesBase" + + [deps.TimeZones.weakdeps] + RecipesBase = "3cdcf5f2-1ef4-517c-9805-6587b60abb01" + +[[deps.TranscodingStreams]] +git-tree-sha1 = "0c45878dcfdcfa8480052b6ab162cdd138781742" +uuid = "3bb67fe8-82b1-5028-8e26-92a6c54297fa" +version = "0.11.3" + +[[deps.URIs]] +git-tree-sha1 = "bef26fb046d031353ef97a82e3fdb6afe7f21b1a" +uuid = "5c2747f8-b7ea-4ff2-ba2e-563bfd36b1d4" +version = "1.6.1" + +[[deps.UUIDs]] +deps = ["Random", "SHA"] +uuid = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" +version = "1.11.0" + +[[deps.Unicode]] +uuid = "4ec0a83e-493e-50e2-b9ac-8f72acf5a8f5" +version = "1.11.0" + +[[deps.WeakRefStrings]] +deps = ["DataAPI", "InlineStrings", "Parsers"] +git-tree-sha1 = "b1be2855ed9ed8eac54e5caff2afcdb442d52c23" +uuid = "ea10d353-3f73-51f8-a26c-33c1cb351aa5" +version = "1.4.2" + +[[deps.WorkerUtilities]] +git-tree-sha1 = "cd1659ba0d57b71a464a29e64dbc67cfe83d54e7" +uuid = "76eceee3-57b5-4d4a-8e66-0e911cebbf60" +version = "1.6.1" + +[[deps.Zlib_jll]] +deps = ["Libdl"] +uuid = "83775a58-1f1d-513f-b197-d71354ab007a" +version = "1.3.1+2" + +[[deps.Zstd_jll]] +deps = ["Artifacts", "JLLWrappers", "Libdl"] +git-tree-sha1 = "446b23e73536f84e8037f5dce465e92275f6a308" +uuid = "3161d3a3-bdf6-5164-811a-617609db77b4" +version = "1.5.7+1" + +[[deps.libblastrampoline_jll]] +deps = ["Artifacts", "Libdl"] +uuid = "8e850b90-86db-534c-a0d3-1478176c7d93" +version = "5.15.0+0" + +[[deps.libsodium_jll]] +deps = ["Artifacts", "JLLWrappers", "Libdl"] +git-tree-sha1 = "011b0a7331b41c25524b64dc42afc9683ee89026" +uuid = "a9144af2-ca23-56d9-984f-0d03f7b5ccf8" +version = "1.0.21+0" + +[[deps.nghttp2_jll]] +deps = ["Artifacts", "Libdl"] +uuid = "8e850ede-7688-5339-a07c-302acd2aaf8d" +version = "1.64.0+1" + +[[deps.p7zip_jll]] +deps = ["Artifacts", "CompilerSupportLibraries_jll", "Libdl"] +uuid = "3f19e933-33d8-53b3-aaab-bd5110c3b7a0" +version = "17.7.0+0" diff --git a/Plik_API_document.md b/Plik_API_document.md new file mode 100644 index 0000000..bfef4a4 --- /dev/null +++ b/Plik_API_document.md @@ -0,0 +1,194 @@ +### API +Plik server expose a REST-full API to manage uploads and get files : + +Get and create upload : + + - **POST** /upload + - Params (json object in request body) : + - oneshot (bool) + - stream (bool) + - removable (bool) + - ttl (int) + - login (string) + - password (string) + - files (see below) + - Return : + JSON formatted upload object. + Important fields : + - id (required to upload files) + - uploadToken (required to upload/remove files) + - files (see below) + + For stream mode you need to know the file id before the upload starts as it will block. + File size and/or file type also need to be known before the upload starts as they have to be printed + in HTTP response headers. + To get the file ids pass a "files" json object with each file you are about to upload. + Fill the reference field with an arbitrary string to avoid matching file ids using the fileName field. + This is also used to notify of MISSING files when file upload is not yet finished or has failed. + ``` + "files" : [ + { + "fileName": "file.txt", + "fileSize": 12345, + "fileType": "text/plain", + "reference": "0" + },... + ] + ``` + + - **GET** /upload/:uploadid: + - Get upload metadata (files list, upload date, ttl,...) + +Upload file : + + - **POST** /$mode/:uploadid:/:fileid:/:filename: + - Request body must be a multipart request with a part named "file" containing file data. + + - **POST** /file/:uploadid: + - Same as above without passing file id, won't work for stream mode. + + - **POST** /: + - Quick mode, automatically create an upload with default parameters and add the file to it. + +Get file : + + - **HEAD** /$mode/:uploadid:/:fileid:/:filename: + - Returns only HTTP headers. Useful to know Content-Type and Content-Length without downloading the file. Especially if upload has OneShot option enabled. + + - **GET** /$mode/:uploadid:/:fileid:/:filename: + - Download file. Filename **MUST** match. A browser, might try to display the file if it's a jpeg for example. You may try to force download with ?dl=1 in url. + + - **GET** /archive/:uploadid:/:filename: + - Download uploaded files in a zip archive. :filename: must end with .zip + +Remove file : + + - **DELETE** /$mode/:uploadid:/:fileid:/:filename: + - Delete file. Upload **MUST** have "removable" option enabled. + +Show server details : + + - **GET** /version + - Show plik server version, and some build information (build host, date, git revision,...) + + - **GET** /config + - Show plik server configuration (ttl values, max file size, ...) + + - **GET** /stats + - Get server statistics ( upload/file count, user count, total size used ) + - Admin only + +User authentication : + + - + Plik can authenticate users using Google and/or OVH third-party API. + The /auth API is designed for the Plik web application nevertheless if you want to automatize it be sure to provide a valid + Referrer HTTP header and forward all session cookies. + Plik session cookies have the "secure" flag set, so they can only be transmitted over secure HTTPS connections. + To avoid CSRF attacks the value of the plik-xsrf cookie MUST be copied in the X-XSRFToken HTTP header of each + authenticated request. + Once authenticated a user can generate upload tokens. Those tokens can be used in the X-PlikToken HTTP header used to link + an upload to the user account. It can be put in the ~/.plikrc file of the Plik command line client. + + - **Local** : + - You'll need to create users using the server command line + + - **Google** : + - You'll need to create a new application in the [Google Developper Console](https://console.developers.google.com) + - You'll be handed a Google API ClientID and a Google API ClientSecret that you'll need to put in the plikd.cfg file + - Do not forget to whitelist valid origin and redirect url ( https://yourdomain/auth/google/callback ) for your domain + + - **OVH** : + - You'll need to create a new application in the OVH API : https://eu.api.ovh.com/createApp/ + - You'll be handed an OVH application key and an OVH application secret key that you'll need to put in the plikd.cfg file + + - **GET** /auth/google/login + - Get Google user consent URL. User have to visit this URL to authenticate + + - **GET** /auth/google/callback + - Callback of the user consent dialog + - The user will be redirected back to the web application with a Plik session cookie at the end of this call + + - **GET** /auth/ovh/login + - Get OVH user consent URL. User have to visit this URL to authenticate + - The response will contain a temporary session cookie to forward the API endpoint and OVH consumer key to the callback + + - **GET** /auth/ovh/callback + - Callback of the user consent dialog. + - The user will be redirected back to the web application with a Plik session cookie at the end of this call + + - **POST** /auth/local/login + - Params : + - login : user login + - password : user password + + - **GET** /auth/logout + - Invalidate Plik session cookies + + - **GET** /me + - Return basic user info ( ID, name, email ) and tokens + + - **DELETE** /me + - Remove user account. + + - **GET** /me/token + - List user tokens + - This call use pagination + + - **POST** /me/token + - Create a new upload token + - A comment can be passed in the json body + + - **DELETE** /me/token/{token} + - Revoke an upload token + + - **GET** /me/uploads + - List user uploads + - Params : + - token : filter by token + - This call use pagination + + - **DELETE** /me/uploads + - Remove all uploads linked to a user account + - Params : + - token : filter by token + + - **GET** /me/stats + - Get user statistics ( upload/file count, total size used ) + + - **GET** /users + - List all users + - This call use pagination + - Admin only + +QRCode : + + - **GET** /qrcode + - Generate a QRCode image from an url + - Params : + - url : The url you want to store in the QRCode + - size : The size of the generated image in pixels (default: 250, max: 1000) + + +$mode can be "file" or "stream" depending if stream mode is enabled. See FAQ for more details. + +Examples : +```sh +Create an upload (in the json response, you'll have upload id and upload token) +$ curl -X POST http://127.0.0.1:8080/upload + +Create a OneShot upload +$ curl -X POST -d '{ "OneShot" : true }' http://127.0.0.1:8080/upload + +Upload a file to upload +$ curl -X POST --header "X-UploadToken: M9PJftiApG1Kqr81gN3Fq1HJItPENMhl" -F "file=@test.txt" http://127.0.0.1:8080/file/IsrIPIsDskFpN12E + +Get headers +$ curl -I http://127.0.0.1:8080/file/IsrIPIsDskFpN12E/sFjIeokH23M35tN4/test.txt +HTTP/1.1 200 OK +Content-Disposition: filename=test.txt +Content-Length: 3486 +Content-Type: text/plain; charset=utf-8 +Date: Fri, 15 May 2015 09:16:20 GMT + +``` \ No newline at end of file diff --git a/Project.toml b/Project.toml new file mode 100644 index 0000000..e5188aa --- /dev/null +++ b/Project.toml @@ -0,0 +1,8 @@ +[deps] +Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45" +Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" +GeneralUtils = "c6c72f09-b708-4ac8-ac7c-2084d70108fe" +HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" +JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" +NATS = "55e73f9c-eeeb-467f-b4cc-a633fde63d2a" +UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" diff --git a/architecture.md b/architecture.md new file mode 100644 index 0000000..4068afc --- /dev/null +++ b/architecture.md @@ -0,0 +1,294 @@ +# Architecture Documentation: Bi-Directional Data Bridge (Julia ↔ JavaScript) + +## Overview + +This document describes the architecture for a high-performance, bi-directional data bridge between a Julia service and a JavaScript (Node.js) service using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads. + +## Architecture Diagram + +```mermaid +flowchart TD + subgraph Client + JS[JavaScript Client] + JSApp[Application Logic] + end + + subgraph Server + Julia[Julia Service] + NATS[NATS Server] + FileServer[HTTP File Server] + end + + JS -->|Control/Small Data| JSApp + JSApp -->|NATS| NATS + NATS -->|NATS| Julia + Julia -->|NATS| NATS + Julia -->|HTTP POST| FileServer + JS -->|HTTP GET| FileServer + + style JS fill:#e1f5fe + style Julia fill:#e8f5e9 + style NATS fill:#fff3e0 + style FileServer fill:#f3e5f5 +``` + +## System Components + +### 1. Unified JSON Envelope Schema + +All messages use a standardized envelope format: + +```json +{ + "correlation_id": "uuid-v4-string", + "type": "json|table|binary", + "transport": "direct|link", + "payload": "base64-encoded-string", // Only if transport=direct + "url": "http://fileserver/path/to/data", // Only if transport=link + "metadata": { + "content_type": "application/octet-stream", + "content_length": 123456, + "format": "arrow_ipc_stream" + } +} +``` + +### 2. Transport Strategy Decision Logic + +``` +┌─────────────────────────────────────────────────────────────┐ +│ SmartSend Function │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Is payload size < 1MB? │ +└─────────────────────────────────────────────────────────────┘ + │ + ┌─────────────────┴─────────────────┐ + ▼ ▼ + ┌─────────────────┐ ┌─────────────────┐ + │ Direct Path │ │ Link Path │ + │ (< 1MB) │ │ (> 1MB) │ + │ │ │ │ + │ • Serialize to │ │ • Serialize to │ + │ IOBuffer │ │ IOBuffer │ + │ • Base64 encode │ │ • Upload to │ + │ • Publish to │ │ HTTP Server │ + │ NATS │ │ • Publish to │ + │ │ │ NATS with URL │ + └─────────────────┘ └─────────────────┘ +``` + +### 3. Julia Module Architecture + +```mermaid +graph TD + subgraph JuliaModule + SmartSendJulia[SmartSend Julia] + SizeCheck[Size Check] + DirectPath[Direct Path] + LinkPath[Link Path] + HTTPClient[HTTP Client] + end + + SmartSendJulia --> SizeCheck + SizeCheck -->|< 1MB| DirectPath + SizeCheck -->|>= 1MB| LinkPath + LinkPath --> HTTPClient + + style JuliaModule fill:#c5e1a5 +``` + +### 4. JavaScript Module Architecture + +```mermaid +graph TD + subgraph JSModule + SmartSendJS[SmartSend JS] + SmartReceiveJS[SmartReceive JS] + JetStreamConsumer[JetStream Pull Consumer] + ApacheArrow[Apache Arrow] + end + + SmartSendJS --> NATS + SmartReceiveJS --> JetStreamConsumer + JetStreamConsumer --> ApacheArrow + + style JSModule fill:#f3e5f5 +``` + +## Implementation Details + +### Julia Implementation + +#### Dependencies +- `NATS.jl` - Core NATS functionality +- `Arrow.jl` - Arrow IPC serialization +- `JSON3.jl` - JSON parsing +- `HTTP.jl` - HTTP client for file server +- `Dates.jl` - Timestamps for logging + +#### SmartSend Function + +```julia +function SmartSend( + subject::String, + data::Any, + type::String = "json"; + nats_url::String = "nats://localhost:4222", + fileserver_url::String = "http://localhost:8080/upload", + size_threshold::Int = 1_000_000 # 1MB +) +``` + +**Flow:** +1. Serialize data to Arrow IPC stream (if table) +2. Check payload size +3. If < threshold: publish directly to NATS with Base64-encoded payload +4. If >= threshold: upload to HTTP server, publish NATS with URL + +#### SmartReceive Handler + +```julia +function SmartReceive(msg::NATS.Message) + # Parse envelope + # Check transport type + # If direct: decode Base64 payload + # If link: fetch from URL with exponential backoff + # Deserialize Arrow IPC to DataFrame +end +``` + +### JavaScript Implementation + +#### Dependencies +- `nats.js` - Core NATS functionality +- `apache-arrow` - Arrow IPC serialization +- `uuid` - Correlation ID generation + +#### SmartSend Function + +```javascript +async function SmartSend(subject, data, type = 'json', options = {}) +``` + +**Flow:** +1. Serialize data to Arrow IPC buffer (if table) +2. Check payload size +3. If < threshold: publish directly to NATS +4. If >= threshold: upload to HTTP server, publish NATS with URL + +#### SmartReceive Handler + +```javascript +async function SmartReceive(msg, options = {}) +``` + +**Flow:** +1. Parse envelope +2. Check transport type +3. If direct: decode Base64 payload +4. If link: fetch with exponential backoff +5. Deserialize Arrow IPC with zero-copy + +## Scenario Implementations + +### Scenario 1: Command & Control (Small JSON) + +**Julia (Receiver):** +```julia +# Subscribe to control subject +# Parse JSON envelope +# Execute simulation with parameters +# Send acknowledgment +``` + +**JavaScript (Sender):** +```javascript +// Create small JSON config +// Send via SmartSend with type="json" +``` + +### Scenario 2: Deep Dive Analysis (Large Arrow Table) + +**Julia (Sender):** +```julia +# Create large DataFrame +# Convert to Arrow IPC stream +# Check size (> 1MB) +# Upload to HTTP server +# Publish NATS with URL +``` + +**JavaScript (Receiver):** +```javascript +// Receive NATS message with URL +// Fetch data from HTTP server +// Parse Arrow IPC with zero-copy +// Load into Perspective.js or D3 +``` + +### Scenario 3: Live Audio Processing + +**JavaScript (Sender):** +```javascript +// Capture audio chunk +// Send as binary with metadata headers +// Use SmartSend with type="audio" +``` + +**Julia (Receiver):** +```julia +// Receive audio data +// Perform FFT or AI transcription +// Send results back (JSON + Arrow table) +``` + +### Scenario 4: Catch-Up (JetStream) + +**Julia (Producer):** +```julia +# Publish to JetStream +# Include metadata for temporal tracking +``` + +**JavaScript (Consumer):** +```javascript +// Connect to JetStream +// Request replay from last 10 minutes +// Process historical and real-time messages +``` + +## Performance Considerations + +### Zero-Copy Reading +- Use Arrow's memory-mapped file reading +- Avoid unnecessary data copying during deserialization +- Use Apache Arrow's native IPC reader + +### Exponential Backoff +- Implement exponential backoff for HTTP link fetching +- Maximum retry count: 5 +- Base delay: 100ms, max delay: 5000ms + +### Correlation ID Logging +- Log correlation_id at every stage +- Include: send, receive, serialize, deserialize +- Use structured logging format + +## Testing Strategy + +### Unit Tests +- Test SmartSend with various payload sizes +- Test SmartReceive with direct and link transport +- Test Arrow IPC serialization/deserialization + +### Integration Tests +- Test full flow with NATS server +- Test large data transfer (> 100MB) +- Test audio processing pipeline + +### Performance Tests +- Measure throughput for small payloads +- Measure throughput for large payloads diff --git a/docs/IMPLEMENTATION.md b/docs/IMPLEMENTATION.md new file mode 100644 index 0000000..6f8a767 --- /dev/null +++ b/docs/IMPLEMENTATION.md @@ -0,0 +1,321 @@ +# Implementation Guide: Bi-Directional Data Bridge + +## Overview + +This document describes the implementation of the high-performance, bi-directional data bridge between Julia and JavaScript services using NATS (Core & JetStream), implementing the Claim-Check pattern for large payloads. + +## Architecture + +The implementation follows the Claim-Check pattern: + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ SmartSend Function │ +└─────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────┐ +│ Is payload size < 1MB? │ +└─────────────────────────────────────────────────────────────────────────┘ + │ + ┌─────────────────┴─────────────────┐ + ▼ ▼ + ┌─────────────────┐ ┌─────────────────┐ + │ Direct Path │ │ Link Path │ + │ (< 1MB) │ │ (> 1MB) │ + │ │ │ │ + │ • Serialize to │ │ • Serialize to │ + │ IOBuffer │ │ IOBuffer │ + │ • Base64 encode │ │ • Upload to │ + │ • Publish to │ │ HTTP Server │ + │ NATS │ │ • Publish to │ + │ │ │ NATS with URL │ + └─────────────────┘ └─────────────────┘ +``` + +## Files + +### Julia Module: [`src/julia_bridge.jl`](../src/julia_bridge.jl) + +The Julia implementation provides: + +- **[`MessageEnvelope`](../src/julia_bridge.jl)**: Struct for the unified JSON envelope +- **[`SmartSend()`](../src/julia_bridge.jl)**: Handles transport selection based on payload size +- **[`SmartReceive()`](../src/julia_bridge.jl)**: Handles both direct and link transport + +### JavaScript Module: [`src/js_bridge.js`](../src/js_bridge.js) + +The JavaScript implementation provides: + +- **`MessageEnvelope` class**: For the unified JSON envelope +- **[`SmartSend()`](../src/js_bridge.js)**: Handles transport selection based on payload size +- **[`SmartReceive()`](../src/js_bridge.js)**: Handles both direct and link transport + +## Installation + +### Julia Dependencies + +```julia +using Pkg +Pkg.add("NATS") +Pkg.add("Arrow") +Pkg.add("JSON3") +Pkg.add("HTTP") +Pkg.add("UUIDs") +Pkg.add("Dates") +``` + +### JavaScript Dependencies + +```bash +npm install nats.js apache-arrow uuid base64-url +``` + +## Usage Tutorial + +### Step 1: Start NATS Server + +```bash +docker run -p 4222:4222 nats:latest +``` + +### Step 2: Start HTTP File Server (optional) + +```bash +# Create a directory for file uploads +mkdir -p /tmp/fileserver + +# Use any HTTP server that supports POST for file uploads +# Example: Python's built-in server +python3 -m http.server 8080 --directory /tmp/fileserver +``` + +### Step 3: Run Test Scenarios + +```bash +# Scenario 1: Command & Control (JavaScript sender) +node test/scenario1_command_control.js + +# Scenario 2: Large Arrow Table (JavaScript sender) +node test/scenario2_large_table.js + +# Scenario 3: Julia-to-Julia communication +# Run both Julia and JavaScript versions +julia test/scenario3_julia_to_julia.jl +node test/scenario3_julia_to_julia.js +``` + +## Usage + +### Scenario 1: Command & Control (Small JSON) + +#### JavaScript (Sender) +```javascript +const { SmartSend } = require('./js_bridge'); + +const config = { + step_size: 0.01, + iterations: 1000 +}; + +await SmartSend("control", config, "json", { + correlationId: "unique-id" +}); +``` + +#### Julia (Receiver) +```julia +using NATS +using JSON3 + +# Subscribe to control subject +subscribe(nats, "control") do msg + env = MessageEnvelope(String(msg.data)) + config = JSON3.read(env.payload) + + # Execute simulation with parameters + step_size = config.step_size + iterations = config.iterations + + # Send acknowledgment + response = Dict("status" => "Running", "correlation_id" => env.correlation_id) + publish(nats, "control_response", JSON3.stringify(response)) +end +``` + +### Scenario 2: Deep Dive Analysis (Large Arrow Table) + +#### Julia (Sender) +```julia +using Arrow +using DataFrames + +# Create large DataFrame +df = DataFrame( + id = 1:10_000_000, + value = rand(10_000_000), + category = rand(["A", "B", "C"], 10_000_000) +) + +# Send via SmartSend with type="table" +await SmartSend("analysis_results", df, "table"); +``` + +#### JavaScript (Receiver) +```javascript +const { SmartReceive } = require('./js_bridge'); + +const result = await SmartReceive(msg); + +// Use table data for visualization with Perspective.js or D3 +const table = result.data; +``` + +### Scenario 3: Live Binary Processing + +#### JavaScript (Sender) +```javascript +const { SmartSend } = require('./js_bridge'); + +// Capture binary chunk +const binaryData = await navigator.mediaDevices.getUserMedia({ binary: true }); + +await SmartSend("binary_input", binaryData, "binary", { + metadata: { + sample_rate: 44100, + channels: 1 + } +}); +``` + +#### Julia (Receiver) +```julia +using WAV +using DSP + +# Receive binary data +function process_binary(data) + # Perform FFT or AI transcription + spectrum = fft(data) + + # Send results back (JSON + Arrow table) + results = Dict("transcription" => "sample text", "spectrum" => spectrum) + await SmartSend("binary_output", results, "json") +end +``` + +### Scenario 4: Catch-Up (JetStream) + +#### Julia (Producer) +```julia +using NATS + +function publish_health_status(nats) + jetstream = JetStream(nats, "health_updates") + + while true + status = Dict("cpu" => rand(), "memory" => rand()) + publish(jetstream, "health", status) + sleep(5) # Every 5 seconds + end +end +``` + +#### JavaScript (Consumer) +```javascript +const { connect } = require('nats'); + +const nc = await connect({ servers: ['nats://localhost:4222'] }); +const js = nc.jetstream(); + +// Request replay from last 10 minutes +const consumer = await js.pullSubscribe("health", { + durable_name: "catchup", + max_batch: 100, + max_ack_wait: 30000 +}); + +// Process historical and real-time messages +for await (const msg of consumer) { + const result = await SmartReceive(msg); + // Process the data + msg.ack(); +} +``` + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `NATS_URL` | `nats://localhost:4222` | NATS server URL | +| `FILESERVER_URL` | `http://localhost:8080/upload` | HTTP file server URL | +| `SIZE_THRESHOLD` | `1_000_000` | Size threshold in bytes (1MB) | + +### Message Envelope Schema + +```json +{ + "correlation_id": "uuid-v4-string", + "type": "json|table|binary", + "transport": "direct|link", + "payload": "base64-encoded-string", // Only if transport=direct + "url": "http://fileserver/path/to/data", // Only if transport=link + "metadata": { + "content_type": "application/octet-stream", + "content_length": 123456, + "format": "arrow_ipc_stream" + } +} +``` + +## Performance Considerations + +### Zero-Copy Reading +- Use Arrow's memory-mapped file reading +- Avoid unnecessary data copying during deserialization +- Use Apache Arrow's native IPC reader + +### Exponential Backoff +- Maximum retry count: 5 +- Base delay: 100ms, max delay: 5000ms +- Implemented in both Julia and JavaScript implementations + +### Correlation ID Logging +- Log correlation_id at every stage +- Include: send, receive, serialize, deserialize +- Use structured logging format + +## Testing + +Run the test scripts: + +```bash +# Scenario 1: Command & Control (JavaScript sender) +node test/scenario1_command_control.js + +# Scenario 2: Large Arrow Table (JavaScript sender) +node test/scenario2_large_table.js +``` + +## Troubleshooting + +### Common Issues + +1. **NATS Connection Failed** + - Ensure NATS server is running + - Check NATS_URL configuration + +2. **HTTP Upload Failed** + - Ensure file server is running + - Check FILESERVER_URL configuration + - Verify upload permissions + +3. **Arrow IPC Deserialization Error** + - Ensure data is properly serialized to Arrow format + - Check Arrow version compatibility + +## License + +MIT \ No newline at end of file diff --git a/etc.jl b/etc.jl new file mode 100644 index 0000000..4b9b847 --- /dev/null +++ b/etc.jl @@ -0,0 +1,42 @@ + +""" fileServerURL = "http://192.168.88.104:8080" +filepath = "/home/ton/docker-apps/sendreceive/image/test.zip" +filename = basename(filepath) +filebytes = read(filepath) + +plik_oneshot_upload - Upload a single file to a plik server using one-shot mode + +This function uploads a raw byte array to a plik server in one-shot mode (no upload session). +It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`, +retrieves an upload ID and token, then uploads the file data as multipart form data using the token. + +The function handles the entire flow: +1. Obtains an upload ID and token from the server +2. Uploads the provided binary data as a file using the `X-UploadToken` header +3. Returns identifiers and download URL for the uploaded file + +# Arguments: + - `fileServerURL::String` - Base URL of the plik server (e.g., `"http://192.168.88.104:8080"`) + - `filename::String` - Name of the file being uploaded + - `data::Vector{UInt8}` - Raw byte data of the file content + +# Return: + - A named tuple with fields: + - `uploadid::String` - ID of the one-shot upload session + - `fileid::String` - ID of the uploaded file within the session + - `downloadurl::String` - Full URL to download the uploaded file + +# Example +```jldoctest +using HTTP, JSON + +# Example data: "Hello" as bytes +data = collect("Hello World!" |> collect |> CodeUnits |> collect) + +# Upload to local plik server +result = plik_oneshot_upload("http://192.168.88.104:8080", "hello.txt", data) + +# Download URL for the uploaded file +println(result.downloadurl) +``` +""" \ No newline at end of file diff --git a/largefile.zip b/largefile.zip new file mode 100644 index 0000000..56dfcd1 Binary files /dev/null and b/largefile.zip differ diff --git a/src/NATSBridge.jl b/src/NATSBridge.jl new file mode 100644 index 0000000..902613e --- /dev/null +++ b/src/NATSBridge.jl @@ -0,0 +1,669 @@ +# Bi-Directional Data Bridge - Julia Module +# Implements smartsend and smartreceive for NATS communication +# This module provides functionality for sending and receiving data across network boundaries +# using NATS as the message bus, with support for both direct payload transport and +# URL-based transport for larger payloads. + +module NATSBridge + +using NATS, JSON, Arrow, HTTP, UUIDs, Dates +# ---------------------------------------------- 100 --------------------------------------------- # + +# Constants +const DEFAULT_SIZE_THRESHOLD = 1_000_000 # 1MB - threshold for switching from direct to link transport +const DEFAULT_NATS_URL = "nats://localhost:4222" # Default NATS server URL +const DEFAULT_FILESERVER_URL = "http://localhost:8080/upload" # Default HTTP file server URL for link transport + + +""" Struct for the unified JSON envelope +This struct represents a standardized message format that can carry either +direct payload data or a URL reference, allowing flexible transport strategies +based on payload size and requirements. +""" +struct MessageEnvelope + correlation_id::String # Unique identifier to track messages across systems + type::String # Data type indicator (e.g., "json", "table", "binary") + transport::String # Transport strategy: "direct" (base64 encoded bytes) or "link" (URL reference) + payload::Union{String, Nothing} # Base64-encoded payload for direct transport + url::Union{String, Nothing} # URL reference for link transport + metadata::Dict{String, Any} # Additional metadata about the payload +end + +""" Constructor for MessageEnvelope with keyword arguments and defaults +This constructor provides a convenient way to create an envelope using keyword arguments, +automatically generating a correlation ID if not provided, and defaulting to "json" type +and "direct" transport. +""" +function MessageEnvelope( + ; correlation_id::String = string(uuid4()), # Generate unique ID if not provided + type::String = "json", # Default data type + transport::String = "direct", # Default transport method + payload::Union{String, Nothing} = nothing, # No payload by default + url::Union{String, Nothing} = nothing, # No URL by default + metadata::Dict{String, Any} = Dict{String, Any}() # Empty metadata by default +) + MessageEnvelope(correlation_id, type, transport, payload, url, metadata) +end + +""" Constructor for MessageEnvelope from JSON string +This constructor parses a JSON string and reconstructs a MessageEnvelope struct. +It handles the metadata field specially by converting the JSON object to a Julia Dict, +extracting values from the JSON structure for all other fields. +""" + function MessageEnvelope(json_str::String) + data = JSON.parse(json_str) # Parse JSON string into Julia data structure + metadata = Dict{String, Any}() + if haskey(data, :metadata) # Check if metadata exists in JSON + metadata = Dict(String(k) => v for (k, v) in data.metadata) # Convert JSON keys to strings and store in Dict + end + + MessageEnvelope( + correlation_id = String(data.correlation_id), # Extract correlation_id from JSON data + type = String(data.type), # Extract type from JSON data + transport = String(data.transport), # Extract transport from JSON data + payload = haskey(data, :payload) ? String(data.payload) : nothing, # Extract payload if present + url = haskey(data, :url) ? String(data.url) : nothing, # Extract URL if present + metadata = metadata # Use the parsed metadata + ) +end + + +""" Convert MessageEnvelope to JSON string +This function converts the MessageEnvelope struct to a JSON string representation. +It only includes fields in the JSON output if they have non-nothing values, +making the JSON output cleaner and more efficient. +""" +function envelope_to_json(env::MessageEnvelope) + obj = Dict{String, Any}( + "correlation_id" => env.correlation_id, # Always include correlation_id + "type" => env.type, # Always include type + "transport" => env.transport # Always include transport + ) + + if env.payload !== nothing # Only include payload if it exists + obj["payload"] = env.payload + end + + if env.url !== nothing # Only include URL if it exists + obj["url"] = env.url + end + + if !isempty(env.metadata) # Only include metadata if it exists and is not empty + obj["metadata"] = env.metadata + end + + JSON.json(obj) # Convert Dict to JSON string +end + + +""" Log a trace message with correlation ID and timestamp +This function logs information messages with a correlation ID for tracing purposes, +making it easier to track message flow across distributed systems. +""" +function log_trace(correlation_id::String, message::String) + timestamp = Dates.now() # Get current timestamp + @info "[$timestamp] [Correlation: $correlation_id] $message" # Log formatted message +end + + +""" smartsend - Send data either directly via NATS or via a fileserver URL, depending on payload size + +This function intelligently routes data delivery based on payload size relative to a threshold. +If the serialized payload is smaller than `size_threshold`, it encodes the data as Base64 and publishes directly over NATS. +Otherwise, it uploads the data to a fileserver (by default using `plik_oneshot_upload`) and publishes only the download URL over NATS. + +The function workflow: +1. Serializes the provided data according to the specified format (`type`) +2. Compares the serialized size against `size_threshold` +3. For small payloads: encodes as Base64, constructs a "direct" MessageEnvelope, and publishes to NATS +4. For large payloads: uploads to the fileserver, constructs a "link" MessageEnvelope with the URL, and publishes to NATS + +# Arguments: + - `subject::String` - NATS subject to publish the message to + - `data::Any` - Data payload to send (any Julia object) + - `type::String = "json"` - Serialization format: `"json"` or `"arrow"` + +# Keyword Arguments: + - `dataname::String = string(UUIDs.uuid4())` - Filename to use when uploading to fileserver (auto-generated UUID if not provided) + - `nats_url::String = DEFAULT_NATS_URL` - URL of the NATS server + - `fileserver_url::String = DEFAULT_FILESERVER_URL` - Base URL of the fileserver (e.g., `"http://localhost:8080"`) + - `fileServerUploadHandler::Function = plik_oneshot_upload` - Function to handle fileserver uploads (must match signature of `plik_oneshot_upload`) + - `size_threshold::Int = DEFAULT_SIZE_THRESHOLD` - Threshold in bytes separating direct vs link transport + - `correlation_id::Union{String, Nothing} = nothing` - Optional correlation ID for tracing; if `nothing`, a UUID is generated + +# Return: + - A `MessageEnvelope` object containing metadata and transport information: + - `correlation_id::String` - Unique identifier for this message exchange + - `type::String` - Serialization type used (`"json"` or `"arrow"`) + - `transport::String` - Either `"direct"` or `"link"` + - `payload::Union{String, Nothing}` - Base64-encoded data for direct transport, `nothing` for link transport + - `url::Union{String, Nothing}` - Download URL for link transport, `nothing` for direct transport + - `metadata::Dict` - Additional metadata (e.g., `"content_length"`, `"format"`) + +# Example +```julia +using UUIDs + +# Send a small struct directly via NATS +data = Dict("key" => "value") +env = smartsend("my.subject", data, "json") + +# Send a large array using fileserver upload +data = rand(10_000_000) # ~80 MB +env = smartsend("large.data", data, "arrow") + +# In another process, retrieve and deserialize: +# msg = subscribe(nats_url, "my.subject") +# env = json_to_envelope(msg.data) +# data = _deserialize_data(base64decode(env.payload), env.type) +``` +""" +function smartsend( + subject::String, # smartreceive's subject + data::Any, + type::String = "json"; + dataname=string(UUIDs.uuid4()), + nats_url::String = DEFAULT_NATS_URL, + fileserver_url::String = DEFAULT_FILESERVER_URL, + fileServerUploadHandler::Function=plik_oneshot_upload, # a function to handle uploading data to specific HTTP fileserver + size_threshold::Int = DEFAULT_SIZE_THRESHOLD, + correlation_id::Union{String, Nothing} = nothing +) + # Generate correlation ID if not provided + cid = correlation_id !== nothing ? correlation_id : string(uuid4()) # Create or use provided correlation ID + + log_trace(cid, "Starting smartsend for subject: $subject") # Log start of send operation + + # Serialize data based on type + payload_bytes = _serialize_data(data, type) # Convert data to bytes based on type + + payload_size = length(payload_bytes) # Calculate payload size in bytes + log_trace(cid, "Serialized payload size: $payload_size bytes") # Log payload size + + # Decision: Direct vs Link + if payload_size < size_threshold # Check if payload is small enough for direct transport + # Direct path - Base64 encode and send via NATS + payload_b64 = base64encode(payload_bytes) # Encode bytes as base64 string + log_trace(cid, "Using direct transport for $payload_size bytes") # Log transport choice + + env = MessageEnvelope( # Create envelope for direct transport + correlation_id = cid, + type = type, + transport = "direct", + payload = payload_b64, + metadata = Dict("content_length" => payload_size, "format" => "arrow_ipc_stream") + ) + + msg_json = envelope_to_json(env) # Convert envelope to JSON + publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS + + return env # Return the envelope for tracking + else + # Link path - Upload to HTTP server, send URL via NATS + log_trace(cid, "Using link transport, uploading to fileserver") # Log link transport choice + + # Upload to HTTP server + response = fileServerUploadHandler(fileserver_url, dataname, payload_bytes) + + if response[:status] != 200 # Check if upload was successful + error("Failed to upload data to fileserver: $(response[:status])") # Throw error if upload failed + end + + url = response[:url] # URL for the uploaded data + log_trace(cid, "Uploaded to URL: $url") # Log successful upload + + env = MessageEnvelope( # Create envelope for link transport + correlation_id = cid, + type = type, + transport = "link", + url = url, + metadata = Dict("content_length" => payload_size, "format" => "arrow_ipc_stream") + ) + + msg_json = envelope_to_json(env) # Convert envelope to JSON + publish_message(nats_url, subject, msg_json, cid) # Publish message to NATS + + return env # Return the envelope for tracking + end +end + + +""" _serialize_data - Serialize data according to specified format + +This function serializes arbitrary Julia data into a binary representation based on the specified format. +It supports three serialization formats: +- `"json"`: Serializes data as JSON and returns the UTF-8 byte representation +- `"table"`: Serializes data as an Arrow IPC stream (table format) and returns the byte stream +- `"binary"`: Expects already-binary data (either `IOBuffer` or `Vector{UInt8}`) and returns it as bytes + +The function handles format-specific serialization logic: +1. For `"json"`: Converts Julia data to JSON string, then encodes to bytes +2. For `"table"`: Uses Arrow.jl to write data as an Arrow IPC stream to an in-memory buffer +3. For `"binary"`: Extracts bytes from `IOBuffer` or returns `Vector{UInt8}` directly + +# Arguments: + - `data::Any` - Data to serialize (JSON-serializable for `"json"`, table-like for `"table"`, binary for `"binary"`) + - `type::String` - Target format: `"json"`, `"table"`, or `"binary"` + +# Return: + - `Vector{UInt8}` - Binary representation of the serialized data + +# Throws: + - `Error` if `type` is not one of `"json"`, `"table"`, or `"binary"` + - `Error` if `type == "binary"` but `data` is neither `IOBuffer` nor `Vector{UInt8}` + +# Example +```julia +using JSON, Arrow, DataFrames + +# JSON serialization +json_data = Dict("name" => "Alice", "age" => 30) +json_bytes = _serialize_data(json_data, "json") + +# Table serialization with a DataFrame (recommended for tabular data) +df = DataFrame(id = 1:3, name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92]) +table_bytes = _serialize_data(df, "table") + +# Table serialization with named tuple of vectors (also supported) +nt = (id = [1, 2, 3], name = ["Alice", "Bob", "Charlie"], score = [95, 88, 92]) +table_bytes_nt = _serialize_data(nt, "table") + +# Binary data (IOBuffer) +buf = IOBuffer() +write(buf, "hello") +binary_bytes = _serialize_data(buf, "binary") + +# Binary data (already bytes) +binary_bytes_direct = _serialize_data(UInt8[1, 2, 3], "binary") +``` +""" +function _serialize_data(data::Any, type::String) + if type == "json" # JSON data - serialize directly + json_str = JSON.json(data) # Convert Julia data to JSON string + return bytes(json_str) # Convert JSON string to bytes + elseif type == "table" # Table data - convert to Arrow IPC stream + io = IOBuffer() # Create in-memory buffer + Arrow.write(io, data) # Write data as Arrow IPC stream to buffer + return take!(io) # Return the buffer contents as bytes + elseif type == "binary" # Binary data - treat as binary + if isa(data, IOBuffer) # Check if data is an IOBuffer + return take!(data) # Return buffer contents as bytes + elseif isa(data, Vector{UInt8}) # Check if data is already binary + return data # Return binary data directly + else # Unsupported binary data type + error("Binary data must be binary (Vector{UInt8} or IOBuffer)") + end + else # Unknown type + error("Unknown type: $type") + end +end + + +""" Publish message to NATS +This internal function publishes a message to a NATS subject with proper +connection management and logging. + +Arguments: + - `nats_url::String` - NATS server URL + - `subject::String` - NATS subject to publish to + - `message::String` - JSON message to publish + - `correlation_id::String` - Correlation ID for logging +""" +function publish_message(nats_url::String, subject::String, message::String, correlation_id::String) + conn = NATS.connect(nats_url) # Create NATS connection + try + NATS.publish(conn, subject, message) # Publish message to NATS + log_trace(correlation_id, "Message published to $subject") # Log successful publish + finally + NATS.drain(conn) # Ensure connection is closed properly + end +end + + +""" smartreceive - Receive and process messages from NATS +This function processes incoming NATS messages, handling both direct transport +(base64 decoded payloads) and link transport (URL-based payloads). +It deserializes the data based on the transport type and returns the result. +A HTTP file server is required along with its upload function. + +Arguments: + - `msg::NATS.Message` - NATS message to process + +Keyword Arguments: + - `fileserver_url::String` - HTTP file server URL for link transport (default: DEFAULT_FILESERVER_URL) + - `max_retries::Int` - Maximum retry attempts for fetching URL (default: 5) + - `base_delay::Int` - Initial delay for exponential backoff in ms (default: 100) + - `max_delay::Int` - Maximum delay for exponential backoff in ms (default: 5000) + +Return: + - Tuple `(data = deserialized_data, envelope = MessageEnvelope)` - Data and envelope +""" +function smartreceive( + msg::NATS.Msg; + fileserver_url::String = DEFAULT_FILESERVER_URL, + max_retries::Int = 5, + base_delay::Int = 100, + max_delay::Int = 5000 +) + # Parse the envelope + env = MessageEnvelope(String(msg.data)) # Parse NATS message data as JSON envelope + + log_trace(env.correlation_id, "Processing received message") # Log message processing start + + # Check transport type + if env.transport == "direct" # Direct transport - payload is in the message + log_trace(env.correlation_id, "Direct transport - decoding payload") # Log direct transport handling + + # Decode Base64 payload + payload_bytes = base64decode(env.payload) # Decode base64 payload to bytes + + # Deserialize based on type + data = _deserialize_data(payload_bytes, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data + + return (data = data, envelope = env) # Return data and envelope as tuple + elseif env.transport == "link" # Link transport - payload is at URL + log_trace(env.correlation_id, "Link transport - fetching from URL") # Log link transport handling + + # Fetch with exponential backoff + data = _fetch_with_backoff(env.url, max_retries, base_delay, max_delay, env.correlation_id) # Fetch data from URL + + # Deserialize based on type + result = _deserialize_data(data, env.type, env.correlation_id, env.metadata) # Convert bytes to Julia data + + return (data = result, envelope = env) # Return data and envelope as tuple + else # Unknown transport type + error("Unknown transport type: $(env.transport)") # Throw error for unknown transport + end +end + + +""" Fetch data from URL with exponential backoff +This internal function retrieves data from a URL with retry logic using +exponential backoff to handle transient failures. + +Arguments: + - `url::String` - URL to fetch from + - `max_retries::Int` - Maximum number of retry attempts + - `base_delay::Int` - Initial delay in milliseconds + - `max_delay::Int` - Maximum delay in milliseconds + - `correlation_id::String` - Correlation ID for logging + +Return: + - Vector{UInt8} - Fetched data as bytes +""" +function _fetch_with_backoff( + url::String, + max_retries::Int, + base_delay::Int, + max_delay::Int, + correlation_id::String +) + delay = base_delay # Initialize delay with base delay value + for attempt in 1:max_retries # Attempt to fetch data up to max_retries times + try + response = HTTP.request("GET", url) # Make HTTP GET request to URL + if response.status == 200 # Check if request was successful + log_trace(correlation_id, "Successfully fetched data from $url on attempt $attempt") # Log success + return response.body # Return response body as bytes + else # Request failed + error("Failed to fetch: $(response.status)") # Throw error for non-200 status + end + catch e # Handle exceptions during fetch + log_trace(correlation_id, "Attempt $attempt failed: $(typeof(e))") # Log failure + + if attempt < max_retries # Only sleep if not the last attempt + sleep(delay / 1000.0) # Sleep for delay seconds (convert from ms) + delay = min(delay * 2, max_delay) # Double delay for next attempt, capped at max_delay + end + end + end + + error("Failed to fetch data after $max_retries attempts") # Throw error if all attempts failed +end + + +""" Deserialize bytes to data based on type +This internal function converts serialized bytes back to Julia data based on type. +It handles "json" (JSON deserialization), "table" (Arrow IPC deserialization), +and "binary" (binary data). + +Arguments: + - `data::Vector{UInt8}` - Serialized data as bytes + - `type::String` - Data type ("json", "table", "binary") + - `correlation_id::String` - Correlation ID for logging + - `metadata::Dict{String, Any}` - Metadata about the data + +Return: + - Deserialized data (DataFrame for "table", JSON data for "json", bytes for "binary") +""" +function _deserialize_data( + data::Vector{UInt8}, + type::String, + correlation_id::String, + metadata::Dict{String, Any} +) + if type == "json" # JSON data - deserialize + json_str = String(data) # Convert bytes to string + return JSON.parse(json_str) # Parse JSON string to Julia data structure + elseif type == "table" # Table data - deserialize Arrow IPC stream + io = IOBuffer(data) # Create buffer from bytes + df = Arrow.Table(io) # Read Arrow IPC format from buffer + return df # Return DataFrame + elseif type == "binary" # Binary data - return binary + return data # Return bytes directly + else # Unknown type + error("Unknown type: $type") # Throw error for unknown type + end +end + + +""" Decode base64 string to bytes +This internal function decodes a base64-encoded string back to binary data. +It's a wrapper around Base64.decode for consistency in the module. + +Arguments: + - `str::String` - Base64-encoded string to decode + +Return: + - Vector{UInt8} - Decoded binary data +""" +function base64decode(str::String) + return Base64.decode(str) # Decode base64 string to bytes using Julia's Base64 module +end + + +""" plik_oneshot_upload - Upload a single file to a plik server using one-shot mode + +This function uploads a raw byte array to a plik server in one-shot mode (no upload session). +It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`, +retrieves an upload ID and token, then uploads the file data as multipart form data using the token. + +The function workflow: +1. Obtains an upload ID and token from the server +2. Uploads the provided binary data as a file using the `X-UploadToken` header +3. Returns identifiers and download URL for the uploaded file + +# Arguments: + - `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`) + - `filename::String` - Name of the file being uploaded + - `data::Vector{UInt8}` - Raw byte data of the file content + +# Return: + - A named tuple with fields: + - `status::Integer` - HTTP server response status + - `uploadid::String` - ID of the one-shot upload session + - `fileid::String` - ID of the uploaded file within the session + - `url::String` - Full URL to download the uploaded file + +# Example +```jldoctest +using HTTP, JSON + +fileServerURL = "http://localhost:8080" +filepath = "./test.zip" +filename = basename(filepath) +filebytes = read(filepath) # read(filepath) output is raw bytes of the file + +# Upload to local plik server +status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filename, filebytes) + +# to download an uploaded file +curl -L -O "url" +``` +""" #[x] +function plik_oneshot_upload(fileServerURL::String, filename::String, data::Vector{UInt8}) + + # ----------------------------------------- get upload id ---------------------------------------- # + # Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload + url_getUploadID = "$fileServerURL/upload" # URL to get upload ID + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(httpResponse.body) + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + + # ------------------------------------------ upload file ----------------------------------------- # + # Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID + file_multipart = HTTP.Multipart(filename, IOBuffer(data), "application/octet-stream") # Plik won't accept raw bytes upload + url_upload = "$fileServerURL/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + # Create the multipart form data + form = HTTP.Form(Dict( + "file" => file_multipart + )) + + # Execute the POST request + httpResponse = nothing + try + httpResponse = HTTP.post(url_upload, headers, form) + # println("Status: ", httpResponse.status) + responseJson = JSON.parse(httpResponse.body) + catch e + @error "Request failed" exception=e + end + + fileid=responseJson["id"] + + # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" + url = "$fileServerURL/file/$uploadid/$fileid/$filename" + + return (status=httpResponse.status, uploadid=uploadid, fileid=fileid, url=url) +end + + + + + + +""" plik_oneshot_upload(fileServerURL::String, filepath::String) + +Upload a single file to a plik server using one-shot mode. + +This function uploads a file from disk to a plik server in one-shot mode (no upload session). +It first creates a one-shot upload session by sending a POST request with `{"OneShot": true}`, +retrieves an upload ID and token, then uploads the file data as multipart form data using the token. + +The function workflow: +1. Obtains an upload ID and token from the server +2. Uploads the file at `filepath` using multipart form data and the `X-UploadToken` header +3. Returns identifiers and download URL for the uploaded file + +# Arguments: + - `fileServerURL::String` - Base URL of the plik server (e.g., `"http://localhost:8080"`) + - `filepath::String` - Full path to the local file to upload + +# Return: + - A named tuple with fields: + - `status::Integer` - HTTP server response status + - `uploadid::String` - ID of the one-shot upload session + - `fileid::String` - ID of the uploaded file within the session + - `url::String` - Full URL to download the uploaded file + +# Example +```julia +using HTTP, JSON + +fileServerURL = "http://localhost:8080" +filepath = "./test.zip" + +# Upload to local plik server +status, uploadid, fileid, url = plik_oneshot_upload(fileServerURL, filepath) + +# To download the uploaded file later (via curl as example): +curl -L -O "url" +``` +""" #[x] +function plik_oneshot_upload(fileServerURL::String, filepath::String) + + # ----------------------------------------- get upload id ---------------------------------------- # + # Equivalent curl command: curl -X POST -d '{ "OneShot" : true }' http://localhost:8080/upload + filename = basename(filepath) + url_getUploadID = "$fileServerURL/upload" # URL to get upload ID + headers = ["Content-Type" => "application/json"] + body = """{ "OneShot" : true }""" + httpResponse = HTTP.request("POST", url_getUploadID, headers, body; body_is_form=false) + responseJson = JSON.parse(httpResponse.body) + + uploadid = responseJson["id"] + uploadtoken = responseJson["uploadToken"] + println("uploadid = ", uploadid) + + # ------------------------------------------ upload file ----------------------------------------- # + # Equivalent curl command: curl -X POST --header "X-UploadToken: UPLOAD_TOKEN" -F "file=@PATH_TO_FILE" http://localhost:8080/file/UPLOAD_ID + file_multipart = open(filepath, "r") + url_upload = "$fileServerURL/file/$uploadid" + headers = ["X-UploadToken" => uploadtoken] + + # Create the multipart form data + form = HTTP.Form(Dict( + "file" => file_multipart + )) + + # Execute the POST request + httpResponse = nothing + try + httpResponse = HTTP.post(url_upload, headers, form) + # println("Status: ", httpResponse.status) + responseJson = JSON.parse(httpResponse.body) + catch e + @error "Request failed" exception=e + end + + fileid=responseJson["id"] + + # url of the uploaded data e.g. "http://192.168.1.20:8080/file/3F62E/4AgGT/test.zip" + url = "$fileServerURL/file/$uploadid/$fileid/$filename" + + return (status=httpResponse.status, uploadid=uploadid, fileid=fileid, url=url) +end + + + + + + + + + + + + + + + + + + + + + + + + + + + + +end # module \ No newline at end of file diff --git a/src/NATSBridge.js b/src/NATSBridge.js new file mode 100644 index 0000000..eda1329 --- /dev/null +++ b/src/NATSBridge.js @@ -0,0 +1,245 @@ +/** + * Bi-Directional Data Bridge - JavaScript Module + * Implements SmartSend and SmartReceive for NATS communication + */ + +const { v4: uuidv4 } = require('uuid'); +const { decode, encode } = require('base64-url'); +const Arrow = require('apache-arrow'); + +// Constants +const DEFAULT_SIZE_THRESHOLD = 1_000_000; // 1MB +const DEFAULT_NATS_URL = 'nats://localhost:4222'; +const DEFAULT_FILESERVER_URL = 'http://localhost:8080/upload'; + +// Logging helper +function logTrace(correlationId, message) { + const timestamp = new Date().toISOString(); + console.log(`[${timestamp}] [Correlation: ${correlationId}] ${message}`); +} + +// Message Envelope Class +class MessageEnvelope { + constructor(options = {}) { + this.correlation_id = options.correlation_id || uuidv4(); + this.type = options.type || 'json'; + this.transport = options.transport || 'direct'; + this.payload = options.payload || null; + this.url = options.url || null; + this.metadata = options.metadata || {}; + } + + static fromJSON(jsonStr) { + const data = JSON.parse(jsonStr); + return new MessageEnvelope({ + correlation_id: data.correlation_id, + type: data.type, + transport: data.transport, + payload: data.payload || null, + url: data.url || null, + metadata: data.metadata || {} + }); + } + + toJSON() { + const obj = { + correlation_id: this.correlation_id, + type: this.type, + transport: this.transport + }; + + if (this.payload) { + obj.payload = this.payload; + } + + if (this.url) { + obj.url = this.url; + } + + if (Object.keys(this.metadata).length > 0) { + obj.metadata = this.metadata; + } + + return JSON.stringify(obj); + } +} + +// SmartSend for JavaScript - Handles transport selection based on payload size +async function SmartSend(subject, data, type = 'json', options = {}) { + const { + natsUrl = DEFAULT_NATS_URL, + fileserverUrl = DEFAULT_FILESERVER_URL, + sizeThreshold = DEFAULT_SIZE_THRESHOLD, + correlationId = uuidv4() + } = options; + + logTrace(correlationId, `Starting SmartSend for subject: ${subject}`); + + // Serialize data based on type + const payloadBytes = _serializeData(data, type, correlationId); + const payloadSize = payloadBytes.length; + + logTrace(correlationId, `Serialized payload size: ${payloadSize} bytes`); + + // Decision: Direct vs Link + if (payloadSize < sizeThreshold) { + // Direct path - Base64 encode and send via NATS + const payloadBase64 = encode(payloadBytes); + logTrace(correlationId, `Using direct transport for ${payloadSize} bytes`); + + const env = new MessageEnvelope({ + correlation_id: correlationId, + type: type, + transport: 'direct', + payload: payloadBase64, + metadata: { + content_length: payloadSize.toString(), + format: 'arrow_ipc_stream' + } + }); + + await publishMessage(natsUrl, subject, env.toJSON(), correlationId); + return env; + } else { + // Link path - Upload to HTTP server, send URL via NATS + logTrace(correlationId, `Using link transport, uploading to fileserver`); + + const url = await uploadToServer(payloadBytes, fileserverUrl, correlationId); + + const env = new MessageEnvelope({ + correlation_id: correlationId, + type: type, + transport: 'link', + url: url, + metadata: { + content_length: payloadSize.toString(), + format: 'arrow_ipc_stream' + } + }); + + await publishMessage(natsUrl, subject, env.toJSON(), correlationId); + return env; + } +} + +// Helper: Serialize data based on type +function _serializeData(data, type, correlationId) { + if (type === 'json') { + const jsonStr = JSON.stringify(data); + return Buffer.from(jsonStr, 'utf8'); + } else if (type === 'table') { + // Table data - convert to Arrow IPC stream + const writer = new Arrow.Writer(); + writer.writeTable(data); + return writer.toByteArray(); + } else if (type === 'binary') { + // Binary data - treat as binary + if (data instanceof Buffer) { + return data; + } else if (Array.isArray(data)) { + return Buffer.from(data); + } else { + throw new Error('Binary data must be binary (Buffer or Array)'); + } + } else { + throw new Error(`Unknown type: ${type}`); + } +} + +// Helper: Publish message to NATS +async function publishMessage(natsUrl, subject, message, correlationId) { + const { connect } = require('nats'); + + try { + const nc = await connect({ servers: [natsUrl] }); + await nc.publish(subject, message); + logTrace(correlationId, `Message published to ${subject}`); + nc.close(); + } catch (error) { + logTrace(correlationId, `Failed to publish message: ${error.message}`); + throw error; + } +} + +// SmartReceive for JavaScript - Handles both direct and link transport +async function SmartReceive(msg, options = {}) { + const { + fileserverUrl = DEFAULT_FILESERVER_URL, + maxRetries = 5, + baseDelay = 100, + maxDelay = 5000 + } = options; + + const env = MessageEnvelope.fromJSON(msg.data); + + logTrace(env.correlation_id, `Processing received message`); + + if (env.transport === 'direct') { + logTrace(env.correlation_id, `Direct transport - decoding payload`); + + const payloadBytes = decode(env.payload); + const data = _deserializeData(payloadBytes, env.type, env.correlation_id, env.metadata); + + return { data, envelope: env }; + } else if (env.transport === 'link') { + logTrace(env.correlation_id, `Link transport - fetching from URL`); + + const data = await _fetchWithBackoff(env.url, maxRetries, baseDelay, maxDelay, env.correlation_id); + const result = _deserializeData(data, env.type, env.correlation_id, env.metadata); + + return { data: result, envelope: env }; + } else { + throw new Error(`Unknown transport type: ${env.transport}`); + } +} + +// Helper: Fetch with exponential backoff +async function _fetchWithBackoff(url, maxRetries, baseDelay, maxDelay, correlationId) { + let delay = baseDelay; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + const response = await fetch(url); + if (response.ok) { + const buffer = await response.arrayBuffer(); + logTrace(correlationId, `Successfully fetched data from ${url} on attempt ${attempt}`); + return new Uint8Array(buffer); + } else { + throw new Error(`Failed to fetch: ${response.status}`); + } + } catch (error) { + logTrace(correlationId, `Attempt ${attempt} failed: ${error.message}`); + + if (attempt < maxRetries) { + await new Promise(resolve => setTimeout(resolve, delay)); + delay = Math.min(delay * 2, maxDelay); + } + } + } + + throw new Error(`Failed to fetch data after ${maxRetries} attempts`); +} + +// Helper: Deserialize data based on type +async function _deserializeData(data, type, correlationId, metadata) { + if (type === 'json') { + const jsonStr = new TextDecoder().decode(data); + return JSON.parse(jsonStr); + } else if (type === 'table') { + // Deserialize Arrow IPC stream to Table + const table = Arrow.Table.from(data); + return table; + } else if (type === 'binary') { + // Return binary binary data + return data; + } else { + throw new Error(`Unknown type: ${type}`); + } +} + +// Export functions +module.exports = { + SmartSend, + SmartReceive, + MessageEnvelope +}; \ No newline at end of file diff --git a/test/scenario1_command_control.jl b/test/scenario1_command_control.jl new file mode 100644 index 0000000..d81da80 --- /dev/null +++ b/test/scenario1_command_control.jl @@ -0,0 +1,67 @@ +#!/usr/bin/env julia +# Scenario 1: Command & Control (Small JSON) +# Tests small JSON payloads (< 1MB) sent directly via NATS + +using NATS +using JSON3 +using UUIDs + +# Include the bridge module +include("../src/julia_bridge.jl") +using .BiDirectionalBridge + +# Configuration +const CONTROL_SUBJECT = "control" +const RESPONSE_SUBJECT = "control_response" +const NATS_URL = "nats://localhost:4222" + +# Create correlation ID for tracing +correlation_id = string(uuid4()) + +# Receiver: Listen for control commands +function start_control_listener() + conn = NATS.Connection(NATS_URL) + try + NATS.subscribe(conn, CONTROL_SUBJECT) do msg + log_trace(msg.data) + + # Parse the envelope + env = MessageEnvelope(String(msg.data)) + + # Parse JSON payload + config = JSON3.read(env.payload) + + # Execute simulation with parameters + step_size = config.step_size + iterations = config.iterations + + # Simulate processing + sleep(0.1) # Simulate some work + + # Send acknowledgment + response = Dict( + "status" => "Running", + "correlation_id" => env.correlation_id, + "step_size" => step_size, + "iterations" => iterations + ) + + NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response)) + log_trace("Sent response: $(JSON3.stringify(response))") + end + + # Keep listening for 5 seconds + sleep(5) + finally + NATS.close(conn) + end +end + +# Helper: Log with correlation ID +function log_trace(message) + timestamp = Dates.now() + println("[$timestamp] [Correlation: $correlation_id] $message") +end + +# Run the listener +start_control_listener() \ No newline at end of file diff --git a/test/scenario1_command_control.js b/test/scenario1_command_control.js new file mode 100644 index 0000000..742c155 --- /dev/null +++ b/test/scenario1_command_control.js @@ -0,0 +1,34 @@ +#!/usr/bin/env node +// Scenario 1: Command & Control (Small JSON) +// Tests small JSON payloads (< 1MB) sent directly via NATS + +const { SmartSend } = require('../js_bridge'); + +// Configuration +const CONTROL_SUBJECT = "control"; +const NATS_URL = "nats://localhost:4222"; + +// Create correlation ID for tracing +const correlationId = require('uuid').v4(); + +// Sender: Send control command to Julia +async function sendControlCommand() { + const config = { + step_size: 0.01, + iterations: 1000 + }; + + // Send via SmartSend with type="json" + const env = await SmartSend( + CONTROL_SUBJECT, + config, + "json", + { correlationId } + ); + + console.log(`Sent control command with correlation_id: ${correlationId}`); + console.log(`Envelope: ${JSON.stringify(env, null, 2)}`); +} + +// Run the sender +sendControlCommand().catch(console.error); \ No newline at end of file diff --git a/test/scenario2_large_table.jl b/test/scenario2_large_table.jl new file mode 100644 index 0000000..0c2d328 --- /dev/null +++ b/test/scenario2_large_table.jl @@ -0,0 +1,66 @@ +#!/usr/bin/env julia +# Scenario 2: Deep Dive Analysis (Large Arrow Table) +# Tests large Arrow tables (> 1MB) sent via HTTP fileserver + +using NATS +using Arrow +using DataFrames +using JSON3 +using UUIDs + +# Include the bridge module +include("../src/julia_bridge.jl") +using .BiDirectionalBridge + +# Configuration +const ANALYSIS_SUBJECT = "analysis_results" +const RESPONSE_SUBJECT = "analysis_response" +const NATS_URL = "nats://localhost:4222" + +# Create correlation ID for tracing +correlation_id = string(uuid4()) + +# Receiver: Listen for analysis results +function start_analysis_listener() + conn = NATS.Connection(NATS_URL) + try + NATS.subscribe(conn, ANALYSIS_SUBJECT) do msg + log_trace("Received message from $(msg.subject)") + + # Parse the envelope + env = MessageEnvelope(String(msg.data)) + + # Use SmartReceive to handle the data + result = SmartReceive(msg) + + # Process the data based on type + if result.envelope.type == "table" + df = result.data + log_trace("Received DataFrame with $(nrows(df)) rows") + log_trace("DataFrame columns: $(names(df))") + + # Send acknowledgment + response = Dict( + "status" => "Processed", + "correlation_id" => env.correlation_id, + "row_count" => nrows(df) + ) + NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response)) + end + end + + # Keep listening for 10 seconds + sleep(10) + finally + NATS.close(conn) + end +end + +# Helper: Log with correlation ID +function log_trace(message) + timestamp = Dates.now() + println("[$timestamp] [Correlation: $correlation_id] $message") +end + +# Run the listener +start_analysis_listener() \ No newline at end of file diff --git a/test/scenario2_large_table.js b/test/scenario2_large_table.js new file mode 100644 index 0000000..dd0f41c --- /dev/null +++ b/test/scenario2_large_table.js @@ -0,0 +1,54 @@ +#!/usr/bin/env node +// Scenario 2: Deep Dive Analysis (Large Arrow Table) +// Tests large Arrow tables (> 1MB) sent via HTTP fileserver + +const { SmartSend } = require('../js_bridge'); + +// Configuration +const ANALYSIS_SUBJECT = "analysis_results"; +const NATS_URL = "nats://localhost:4222"; + +// Create correlation ID for tracing +const correlationId = require('uuid').v4(); + +// Sender: Send large Arrow table to Julia +async function sendLargeTable() { + // Create a large DataFrame-like structure (10 million rows) + // For testing, we'll create a smaller but still large table + const numRows = 1000000; // 1 million rows + + const data = { + id: Array.from({ length: numRows }, (_, i) => i + 1), + value: Array.from({ length: numRows }, () => Math.random()), + category: Array.from({ length: numRows }, () => ['A', 'B', 'C'][Math.floor(Math.random() * 3)]) + }; + + // Convert to Arrow Table + const { Table, Vector, RecordBatch } = require('apache-arrow'); + + const idVector = Vector.from(data.id); + const valueVector = Vector.from(data.value); + const categoryVector = Vector.from(data.category); + + const table = Table.from({ + id: idVector, + value: valueVector, + category: categoryVector + }); + + // Send via SmartSend with type="table" + const env = await SmartSend( + ANALYSIS_SUBJECT, + table, + "table", + { correlationId } + ); + + console.log(`Sent large table with ${numRows} rows`); + console.log(`Correlation ID: ${correlationId}`); + console.log(`Transport: ${env.transport}`); + console.log(`URL: ${env.url || 'N/A'}`); +} + +// Run the sender +sendLargeTable().catch(console.error); \ No newline at end of file diff --git a/test/scenario3_julia_to_julia.jl b/test/scenario3_julia_to_julia.jl new file mode 100644 index 0000000..42c21d3 --- /dev/null +++ b/test/scenario3_julia_to_julia.jl @@ -0,0 +1,66 @@ +#!/usr/bin/env julia +# Scenario 3: Julia-to-Julia Service Communication +# Tests bi-directional communication between two Julia services + +using NATS +using Arrow +using DataFrames +using JSON3 +using UUIDs + +# Include the bridge module +include("../src/julia_bridge.jl") +using .BiDirectionalBridge + +# Configuration +const SUBJECT1 = "julia_to_js" +const SUBJECT2 = "js_to_julia" +const RESPONSE_SUBJECT = "response" +const NATS_URL = "nats://localhost:4222" + +# Create correlation ID for tracing +correlation_id = string(uuid4()) + +# Julia-to-Julia Test: Large Arrow Table +function test_julia_to_julia_large_table() + conn = NATS.Connection(NATS_URL) + try + # Subscriber on SUBJECT2 to receive data from Julia sender + NATS.subscribe(conn, SUBJECT2) do msg + log_trace("[$(Dates.now())] Received on $SUBJECT2") + + # Use SmartReceive to handle the data + result = SmartReceive(msg) + + # Check transport type + if result.envelope.transport == "direct" + log_trace("Received direct transport with $(length(result.data)) bytes") + else + # For link transport, result.data is the URL + log_trace("Received link transport at $(result.data)") + end + + # Send response back + response = Dict( + "status" => "Processed", + "correlation_id" => result.envelope.correlation_id, + "timestamp" => Dates.now() + ) + NATS.publish(conn, RESPONSE_SUBJECT, JSON3.stringify(response)) + end + + # Keep listening + sleep(5) + finally + NATS.close(conn) + end +end + +# Helper: Log with correlation ID +function log_trace(message) + timestamp = Dates.now() + println("[$timestamp] [Correlation: $correlation_id] $message") +end + +# Run the test +test_julia_to_julia_large_table() \ No newline at end of file diff --git a/test/scenario_tests.md b/test/scenario_tests.md new file mode 100644 index 0000000..7b2df9a --- /dev/null +++ b/test/scenario_tests.md @@ -0,0 +1,148 @@ +# Test Scenarios for Bi-Directional Data Bridge + +## Scenario 1: Command & Control (Small JSON) +Tests small JSON payloads (< 1MB) sent directly via NATS. + +### Julia (Receiver) +```julia +using NATS +using JSON3 + +# Subscribe to control subject +subscribe(nats, "control") do msg + env = MessageEnvelope(String(msg.data)) + + # Parse JSON payload + config = JSON3.read(env.payload) + + # Execute simulation with parameters + step_size = config.step_size + iterations = config.iterations + + # Send acknowledgment + response = Dict("status" => "Running", "correlation_id" => env.correlation_id) + publish(nats, "control_response", JSON3.stringify(response)) +end +``` + +### JavaScript (Sender) +```javascript +const { SmartSend } = require('./js_bridge'); + +// Create small JSON config +const config = { + step_size: 0.01, + iterations: 1000 +}; + +// Send via SmartSend with type="json" +await SmartSend("control", config, "json"); +``` + +## Scenario 2: Deep Dive Analysis (Large Arrow Table) +Tests large Arrow tables (> 1MB) sent via HTTP fileserver. + +### Julia (Sender) +```julia +using Arrow +using DataFrames + +# Create large DataFrame (500MB, 10 million rows) +df = DataFrame( + id = 1:10_000_000, + value = rand(10_000_000), + category = rand(["A", "B", "C"], 10_000_000) +) + +# Convert to Arrow IPC stream and send +await SmartSend("analysis_results", df, "table"); +``` + +### JavaScript (Receiver) +```javascript +const { SmartReceive } = require('./js_bridge'); + +// Receive message with URL +const result = await SmartReceive(msg); + +// Fetch data from HTTP server +const table = result.data; + +// Load into Perspective.js or D3 +// Use table data for visualization +``` + +## Scenario 3: Live Binary Processing +Tests binary data (binary) sent from JS to Julia for FFT/transcription. + +### JavaScript (Sender) +```javascript +const { SmartSend } = require('./js_bridge'); + +// Capture binary chunk (2 seconds, 44.1kHz, 1 channel) +const binaryData = await navigator.mediaDevices.getUserMedia({ binary: true }); + +// Send as binary with metadata headers +await SmartSend("binary_input", binaryData, "binary", { + metadata: { + sample_rate: 44100, + channels: 1 + } +}); +``` + +### Julia (Receiver) +```julia +using WAV +using DSP + +# Receive binary data +function process_binary(data) + # Perform FFT or AI transcription + spectrum = fft(data) + + # Send results back (JSON + Arrow table) + results = Dict("transcription" => "sample text", "spectrum" => spectrum) + await SmartSend("binary_output", results, "json") +end +``` + +## Scenario 4: Catch-Up (JetStream) +Tests temporal decoupling with NATS JetStream. + +### Julia (Producer) +```julia +# Publish to JetStream +using NATS + +function publish_health_status(nats) + jetstream = JetStream(nats, "health_updates") + + while true + status = Dict("cpu" => rand(), "memory" => rand()) + publish(jetstream, "health", status) + sleep(5) # Every 5 seconds + end +end +``` + +### JavaScript (Consumer) +```javascript +const { connect } = require('nats'); + +const nc = await connect({ servers: ['nats://localhost:4222'] }); +const js = nc.jetstream(); + +// Request replay from last 10 minutes +const consumer = await js.pullSubscribe("health", { + durable_name: "catchup", + max_batch: 100, + max_ack_wait: 30000 +}); + +// Process historical and real-time messages +for await (const msg of consumer) { + const result = await SmartReceive(msg); + // Process the data + msg.ack(); +} \ No newline at end of file diff --git a/test/test_large_payload.jl b/test/test_large_payload.jl new file mode 100644 index 0000000..af4350e --- /dev/null +++ b/test/test_large_payload.jl @@ -0,0 +1,121 @@ +#!/usr/bin/env julia +# Test script for large payload testing using binary transport +# Tests sending a large file (> 1MB) via smartsend with binary type + +using NATS, JSON, UUIDs, Dates + +# Include the bridge module +include("../src/NATSBridge.jl") +using .NATSBridge + +# Configuration +const SUBJECT = "/large_binary_test" +const NATS_URL = "nats.yiem.cc" +const FILESERVER_URL = "http://192.168.88.104:8080" + +# Create correlation ID for tracing +correlation_id = string(uuid4()) + +# File path for large binary payload test +const LARGE_FILE_PATH = "./test.zip" + +# Helper: Log with correlation ID +function log_trace(message) + timestamp = Dates.now() + println("[$timestamp] [Correlation: $correlation_id] $message") +end + +# Sender: Send large binary file via smartsend +function test_large_binary_send() + conn = NATS.connect(NATS_URL) + # Read the large file as binary data + log_trace("Reading large file: $LARGE_FILE_PATH") + file_data = read(LARGE_FILE_PATH) + + file_size = length(file_data) + log_trace("File size: $file_size bytes") + + # Use smartsend with binary type - will automatically use link transport + # if file size exceeds the threshold (1MB by default) + env = NATSBridge.smartsend( + SUBJECT, + file_data, + "binary", + nats_url = NATS_URL, + fileserver_url = FILESERVER_URL; + dataname="test.zip" + ) + + log_trace("Sent message with transport: $(env.transport)") + log_trace("Envelope type: $(env.type)") + + # Check if link transport was used + if env.transport == "link" + log_trace("Using link transport - file uploaded to HTTP server") + log_trace("URL: $(env.url)") + else + log_trace("Using direct transport - payload sent via NATS") + end + + NATS.drain(conn) +end + +# Receiver: Listen for messages and verify large payload handling +function test_large_binary_receive() + conn = NATS.connect(NATS_URL) + NATS.subscribe(conn, SUBJECT) do msg + log_trace("Received message on $(msg.subject)") + log_trace("Received message:\n$msg") + + # Use SmartReceive to handle the data + result = SmartReceive(msg) + + # Check transport type + if result.envelope.transport == "direct" + log_trace("Received direct transport with $(length(result.data)) bytes") + else + # For link transport, result.data is the URL + log_trace("Received link transport at $(result.data)") + end + + # Verify the received data matches the original + if result.envelope.type == "binary" + if isa(result.data, Vector{UInt8}) + file_size = length(result.data) + log_trace("Received $(file_size) bytes of binary data") + + # Save received data to a test file + output_path = "test_output.bin" + write(output_path, result.data) + log_trace("Saved received data to $output_path") + + # Verify file size + original_size = length(read(LARGE_FILE_PATH)) + if file_size == original_size + log_trace("SUCCESS: File size matches! Original: $original_size bytes") + else + log_trace("WARNING: File size mismatch! Original: $original_size, Received: $file_size") + end + end + end + end + + # Keep listening for 10 seconds + sleep(10) + NATS.drain(conn) +end + +# Run the test +println("Starting large binary payload test...") +println("Correlation ID: $correlation_id") +println("Large file: $LARGE_FILE_PATH") + +# Run sender first +println("start smartsend") +test_large_binary_send() + +# Run receiver +println("testing smartreceive") +test_large_binary_receive() + +println("Test completed.") \ No newline at end of file