python里的bytes与UE里的FString数据交互
// 以utf-8编码作为约定的交互标准
def Hello(self, endpoint: bytes):
"""
用户上报自己的地址信息
Args:
endpoint (bytes): 地址信息. 格式为: {"client_type": "$client_type", "ip":"$ip", "port":$port}
Returns:
None
"""
print('Hello:')
endpoint_obj = json.loads(endpoint)
with self._lock_endpoints:
if endpoint_obj not in self._endpoints:
print(endpoint)
self._endpoints.append(endpoint_obj)
def GetMessage(self, msg_id: bytes) -> bytes:
"""
根据消息ID获取消息内容。
Args:
msg_id (bytes): 消息ID,类型为bytes。
Returns:
bytes: 消息内容,类型为bytes。如果找不到对应的消息,则返回空bytes。
"""
print('GetMessage:')
msg_idS = msg_id.decode()
if msg_idS in self._meta_msgpairs:
return bytes(self._meta_msgpairs[msg_idS], 'utf-8')
return bytes()
def GetAllMessages(self) -> bytes:
"""
获取所有消息并返回字节类型。
Args:
无
Returns:
bytes: 包含所有消息的字节类型数据。
"""
print('GetAllMessages:')
return json.dumps(self._meta_msgpairs).encode('utf-8')
bool UApplyLutGameInstanceSubsystem::GetAllMessages( TMap<FString, FString>& Messages )
{
if (p_client)
{
auto result = p_client->call("GetAllMessages").as<std::string>();
FString MappingJsonStr(result.size(), (const UTF8CHAR*)result.data()); // UTF8 bytes转FString
// Parsing Json-formatted string
TSharedRef<TJsonReader<>> JsonReader = TJsonReaderFactory<>::Create(MappingJsonStr);
TSharedPtr<FJsonObject> RootJsonObj = MakeShareable(new FJsonObject);
if (FJsonSerializer::Deserialize(JsonReader, RootJsonObj))
{
for(auto iter : RootJsonObj->Values)
{
Messages.Add(iter.Key, iter.Value->AsString());
}
}
else
{
UKismetSystemLibrary::PrintString(GetWorld(), TEXT("GetAllMessages::FJsonSerializer::Deserialize Failed!"));
return false;
}
return !MappingJsonStr.IsEmpty();
}
return false;
}
bool UApplyLutGameInstanceSubsystem::GetMessage( const FString& MsgId, FString& MsgValue )
{
if (p_client)
{
auto result = p_client->call("GetMessage", FSTRING_TO_STD(MsgId)).as<std::string>();
MsgValue = FString(result.size(), (const UTF8CHAR*)result.data());
return !MsgValue.IsEmpty();
}
return false;
}
FString MetaData;
FTCHARToUTF8 MetaUtf8(*MetaData);
NDI_video_frame.p_metadata = MetaUtf8.Get();
#define FSTRING_TO_STD(a) (std::string(TCHAR_TO_UTF8(*a)))
TSharedRef<TJsonReader<>> JsonReader = TJsonReaderFactory<>::Create(JsonListStr);
TArray<TSharedPtr<FJsonValue>> InArray;
if (FJsonSerializer::Deserialize(JsonReader, InArray))
{
for(const auto& iter : InArray)
{
const auto& jsonObject = iter->AsObject().ToSharedRef();
FString Out;
TSharedRef<TJsonWriter<>> JsonWriter = TJsonWriterFactory<>::Create(&Out);
if (!FJsonSerializer::Serialize(jsonObject, JsonWriter))
{
UKismetSystemLibrary::PrintString(GetWorld(), TEXT("GetAllLuts::FJsonSerializer::Serialize Failed!"));
return false;
}
LutParams.Add(Out);
}
}
else
{
UKismetSystemLibrary::PrintString(GetWorld(), TEXT("GetAllLuts::FJsonSerializer::Deserialize Failed!"));
return false;
}
// Parsing Json-formatted string
TSharedRef<TJsonReader<>> JsonReader = TJsonReaderFactory<>::Create(MappingJsonStr);
TSharedPtr<FJsonObject> RootJsonObj = MakeShareable(new FJsonObject);
if (FJsonSerializer::Deserialize(JsonReader, RootJsonObj))
{
for(auto iter : RootJsonObj->Values)
{
Messages.Add(iter.Key, iter.Value->AsString());
}
}
else
{
UKismetSystemLibrary::PrintString(GetWorld(), TEXT("GetAllMessages::FJsonSerializer::Deserialize Failed!"));
return false;
}
https://docs.python.org/3/library/json.html
# 该module总是产生str对象,而不是bytes对象。
import json
""" Encoding """
# json.dump(obj, fp, ......): 将obj作为一个JSON-formatted stream序列化到fp(a .write()-supporting file-like object)。 因为json模块产生的是str对象,所以fp.write() must support str input.
from io import StringIO
io = StringIO()
json.dump(['streaming API'], io)
io.getvalue() # return: '["streaming API"]'
# json.dumps(obj, *, .......): 将obj作为一个JSON-formatted str进行序列化
json.dumps(['foo', {'bar': ('baz', None, 1.0, 2)}])
json.dumps({"c": 0, "b": 0, "a": 0}, sort_keys=True)
json.dumps([1, 2, 3, {'4': 5, '6': 7}], separators=(',', ':'))
json.dumps({'4': 5, '6': 7}, sort_keys=True, indent=4)
""" Decoding """
# json.load(fp, *, ......): 解序列化fp (a .read()-supporting text file or binary file containing a JSON document) 到一个Python对象
from io import StringIO
io = StringIO('["streaming API"]')
json.load(io)
with open(filepath, 'r') as jsonfile:
jsonobj = json.load(jsonfile)
# json.loads(s, ......): Deserialize s (a str, bytes or bytearray instance containing a JSON document) to a Python object
json.loads('["foo", {"bar":["baz", null, 1.0, 2]}]')
import chardet
# 首先二进制方式打开文件
with open(absPath, 'rb') as frb:
# 检测编码方式
cur_encoding = chardet.detect(frb.read())['encoding']
# 指定文件编码方式
with open(absPath, 'r', encoding=cur_encoding) as fr:
Content = fr.read()