154 lines
6.0 KiB
Julia
154 lines
6.0 KiB
Julia
module DB_services
|
|
|
|
""" version 0.2
|
|
"""
|
|
|
|
using DataStructures: count
|
|
export send_to_DB, data_prep_for_DB
|
|
|
|
using DataStructures
|
|
using JSON3
|
|
using Redis
|
|
using Random
|
|
using UUIDs
|
|
|
|
include("Utils.jl")
|
|
using .Utils
|
|
|
|
"""
|
|
Dummy iron_pen_ai for raw_data_db_service testing
|
|
"""
|
|
|
|
#------------------------------------------------------------------------------------------------100
|
|
|
|
|
|
""" Prepare model data for sending to raw_data_db_service by flattening all hierarchy
|
|
data structure inside model_data into 1-dept JSON3.
|
|
This function output is flattened JSON3 data
|
|
*** all parameter name that is going to Cassandra must not contain a capital letter ***
|
|
"""
|
|
function data_prep_for_DB(model_name::String, experiment_number::Int, episode_number::Int,
|
|
time_stamp::Int, model_data::OrderedDict)::Array{OrderedDict, 1}
|
|
|
|
payload_template = OrderedDict{Any, Any}(
|
|
:model_name => model_name,
|
|
:knowledgeFn_name => "none",
|
|
:experiment_number => experiment_number,
|
|
:episode_number => episode_number,
|
|
)
|
|
payloads = []
|
|
for (k, v) in model_data[:m][:knowledgeFn] # loop over each knowledgeFn
|
|
payload = deepcopy(payload_template)
|
|
payload[:knowledgeFn_name] = v[:knowledgefn_name]
|
|
payload[:neurons_list] = []
|
|
for (k1, v1) in v
|
|
if k1 == :neurons_array || k1 == :output_neurons_array
|
|
for (k2, v2) in v1 # loop over each neuron
|
|
if k2 != :type # add the following additonal data into neuron's ODict data (already have its parameters in there)
|
|
neuron = OrderedDict(v2) # v2 is still in JSON3 format but
|
|
# to be able to add new value to
|
|
# it, it needs to be in
|
|
# OrderedDict format
|
|
|
|
# # add corresponding knowledgeFn to neuron OrderedDict
|
|
# neuron[:knowledgefn_name] = v[:knowledgefn_name]
|
|
|
|
# add corresponding experiment_number to neuron OrderedDict
|
|
neuron[:experiment_number] = experiment_number
|
|
|
|
# add corresponding episode_number to neuron OrderedDict
|
|
neuron[:episode_number] = episode_number
|
|
|
|
# # add corresponding tick_number to neuron OrderedDict
|
|
# neuron[:tick_number] = tick_number
|
|
|
|
""" add neuron name of itself to neuron OrderedDict
|
|
since neurons in neurons_array and output_neurons_array has the
|
|
same name (because its name derived from its position in the
|
|
array it lives in). In order to store them in the same
|
|
OrderedDict, I need to change their name so I prefix their name
|
|
with their array name
|
|
"""
|
|
neuron[:neuron_name] = Symbol(string(k1) * "_" * string(k2))
|
|
|
|
neuron[:model_error] = model_data[:m][:model_error]
|
|
|
|
neuron[:knowledgefn_error] = model_data[:m][:knowledgeFn][k][:knowledgeFn_error]
|
|
|
|
neuron[:model_name] = model_name
|
|
|
|
# use as identifier durin debug
|
|
# neuron[:random] = Random.rand(1:100)
|
|
|
|
push!(payload[:neurons_list], neuron)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
push!(payloads, payload)
|
|
end
|
|
return payloads
|
|
end
|
|
|
|
function send_to_DB(model_name::String, experiment_number::Int, episode_number::Int,
|
|
tick_number::Int, model_json_string::String, redis_server_ip::String,
|
|
pub_channel::String, sub_channel::String)
|
|
model_ordereddict = OrderedDict(JSON3.read(model_json_string))
|
|
payloads = data_prep_for_DB(model_name, experiment_number, episode_number, tick_number,
|
|
model_ordereddict)
|
|
|
|
for payload in payloads
|
|
# ask raw data service whether it is ready
|
|
# println("checking raw_data_db_service")
|
|
ask = Dict(:sender => "ironpen_ai",
|
|
:topic => "whois", # [uuid1(), "whois"] to get name of the receiver
|
|
:topic_id => uuid1(),
|
|
:responding_to => nothing, # receiver fills in the message uuid it is responding to
|
|
:communication_channel => sub_channel, # a channel that sender wants receiver to send message to or "none" to get message at receiver's default respond channel
|
|
:instruction => nothing,
|
|
:payload => nothing,
|
|
:isreturn => true)
|
|
incoming_message = Utils.service_query(redis_server_ip, pub_channel, sub_channel, ask)
|
|
# println("raw_data_db_service ok")
|
|
if UUID(incoming_message[:responding_to]) == ask[:topic_id]
|
|
message = Dict(:sender => "ironpen_ai",
|
|
:topic => "process", # [uuid1(), "whois"] to get name of the receiver
|
|
:topic_id => uuid1(),
|
|
:responding_to => nothing, # receiver fills in the message uuid it is responding to
|
|
:communication_channel => sub_channel, # a channel that sender wants receiver to send message to or "none" to get message at receiver's default respond channel
|
|
:instruction => "insert",
|
|
:payload => payload,
|
|
:isreturn => false)
|
|
|
|
result = Utils.service_query(redis_server_ip, pub_channel, sub_channel, message)
|
|
# println("published")
|
|
else
|
|
error("raw_data_db_service not respond")
|
|
end
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
end # module end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|