no message

main
이범준 2 weeks ago
parent afb9ec392d
commit c90d428ef2

@ -1,489 +0,0 @@
#include "MessageSender.h"
#include "hi_ctrl.h"
#include "taglnk_frame.h"
#include "tagbl_struc.h"
char g_szProcessname[MAX_STRING_SIZE + 1];
char g_szProcessPath[PATH_MAX + 1];
int g_nResponseTimeout;
int cmd;
int type;
int job;
char szOrganizationID[6+1];
char szWorkType[4+1];
char szMsgType[4+1];
//char szSerialID[8 + 1] = "00000001";
//char szRequestDTime[14 + 1] = "20180917014900";
char szRespCode[] = "0000";
char szData[MAX_STRING_SIZE + 1];
char rtnData[MAX_STRING_SIZE + 1];
int g_nSerailNo = 1;
pthread_mutex_t g_sSerialMutex = PTHREAD_MUTEX_INITIALIZER;
static struct {
int cmd; /* CONTROL COMMAND */
int act; /* 0:송신 1:응답 2:3자통신 */
int work;
char *cmd_nm;
} lane_act[] = {
{ NS_RQ_SEQ_NO , 1, 0},/* 0x01 전송연번요구 */
{ NS_LANE_STAT , 0, 0},/* 0x75 차로상태(STATUS) */
{ NS_RPR_DATA , 0, 0},/* 0x76 보수데이터 */
{ NS_CLOSE_PASS , 0, 0},/* 0x73 야간계수 데이터 */
{ NS_WRK_START , 0, 0},/* 0x61 근무개시보고 */
{ NS_HND_DATA , 0, 0},/* 0x17 처리 데이터 */
{ NS_CRD_HND_DATA , 0, 0},/* 0x50 카드처리 데이터 */
{ NS_WRK_END , 0, 0},/* 0x28 근무종료데이터 */
{ NS_REMOTE_LANE_STAT_REPLY, 0, 0},/* 0xD4 원격차로상태응답 */
{ NS_TBL_ORDER , 3, 0},/* 0x24 테이블요구(백그라운드용) */
{ -1 , -1, -1 , " "}
};
int MessageSender(int kind, char *packet, char *path, char *rtn)
{
char szProcessname[PATH_MAX + 1];
char *pFindPointer;
struct rcv_lane_data *rd;
struct bl_struct *sd;
int nResult;
int i = 0;
//DaemonStart();
nResult = SafeStrCopy(szProcessname, sizeof(szProcessname), path);
if (nResult != SEC_ERR_SUCCESS)
{
printf("strcpy_s Error![%d]\n", nResult);
}
pFindPointer = szProcessname;
nResult = SafeStrrChr(szProcessname, strnlen_s(szProcessname, sizeof(szProcessname)), PATH_SEPERATOR, &pFindPointer);
if (pFindPointer == NULL)
{
nResult = SafeStrCopy(g_szProcessname, sizeof(g_szProcessname), szProcessname);
}
else
{
nResult = SafeStrCopy(g_szProcessname, sizeof(g_szProcessname), pFindPointer + 1);
}
GetExecPath(g_szProcessPath, sizeof(g_szProcessPath) - 1, path);
if (g_szProcessPath == NULL)
{
printf("Can't get process path[Cann't load config file]!\nExit Program!");
return -1;
}
lane_log("MessageSender Start\n");
nResult = InitMessageSenderConfig();
if (nResult != MQ_ERR_SUCCESS)
{
printf("Program terminated!!!\n");
return -1;
}
lane_log("--------------------------------------------------\n");
WriteLog(LogType_SystemOut,LogLevel_Info, "Process Started![%s]", g_szProcessname);
while(1)
{
if(lane_act[i].cmd == kind || lane_act[i].cmd == -1) break;
i++;
}
cmd = lane_act[i].cmd;
lane_log("lane_act[%d].cmd[%02x][%d]\n", i, lane_act[i].cmd, lane_act[i].act);
snprintf(szOrganizationID, 6+1, "%06d", lb_data2atoi(hi_comm_ctrl.bas_tbl.plz_id, 2));
lane_log("Packet[%s]\n", packet);
if(lane_act[i].act == 2)
snprintf(szWorkType, 4+1, "%.4s", "9998");
else if(lane_act[i].act == 1)
snprintf(szWorkType, 4+1, "%.4s", "9997");
else if(lane_act[i].act == 0)
snprintf(szWorkType, 4+1, "%.4s", "9999");
else if(lane_act[i].act == 3)
snprintf(szWorkType, 4+1, "%.4s", "0080");
else return -1;
if(lane_act[i].act != 3)
{
rd = (struct rcv_lane_data *)packet;
snprintf(szData, sizeof(struct rcv_lane_data)+1, "%s", (char *)rd);
printf("szData[%.2s] [%.2s]\n", szData, rd->plz_id);
}else{
sd = (struct bl_struct *)packet;
snprintf(szData, sizeof(struct bl_struct)+1, "%s", (char *)sd);
printf("szData[%.2s] [%.2s]\n", szData, sd->plz_id);
}
type = lane_act[i].act;
job = lane_act[i].work;
//lane_log("szWorkType[%s] [%s]\n", szWorkType, lane_act[i].cmd_nm);
snprintf(szMsgType , 4+1, "%.4s", "0010");
lane_log("처리할 자료[%s]\n", szData);
//2019.06.27 InitDoubleLinkedList Main 선언
InitThread_Before();
if(type == 0)
nResult = InitThread(ProcThread);
else
nResult = InitThreadWait(ProcThread);
lane_log("종료[%d]?[%s][%d]\n", type, rtnData, nResult);
sprintf(rtn, "%s", rtnData);
WriteLog(LogType_SystemOut,LogLevel_Info, "Process Stopped![%s]", g_szProcessname);
return 0;
}
int InitMessageSenderConfig()
{
char szConfigfilename[PATH_MAX + 1];
char szConfigfilenameTemp[PATH_MAX + 1];
char szValueTemp[MAX_STRING_SIZE + 1];
int nResult;
snprintf(szConfigfilenameTemp, sizeof(szConfigfilenameTemp), "%s%c%s%c", g_szProcessPath, PATH_SEPERATOR, CONF_FILE_PATH, PATH_SEPERATOR);
realpath(szConfigfilenameTemp, szConfigfilename);
snprintf(szValueTemp, sizeof(szValueTemp), "%c", PATH_SEPERATOR);
nResult = SafeStrCat(szConfigfilename, sizeof(szConfigfilename), szValueTemp, 1);
nResult = SafeStrCat(szConfigfilename, sizeof(szConfigfilename), g_szProcessname, strnlen_s(g_szProcessname, sizeof(g_szProcessname)));
nResult = SafeStrCat(szConfigfilename, sizeof(szConfigfilename), CONF_FILE_EXT, strnlen_s(CONF_FILE_EXT, sizeof(CONF_FILE_EXT)));
printf("Process Name : %s\n", g_szProcessname);
printf("Process Path : %s\n", g_szProcessPath);
printf("Config Filename : %s[%d]\n", szConfigfilename, nResult);
ReadConfigFile(true, szConfigfilename);
lane_log("Init 1\n");
// Read MS_RESPONSE_TIMEOUT_KEY
if (GetConfigValue(MS_RESPONSE_TIMEOUT_KEY, szValueTemp, sizeof(szValueTemp)) != CONF_ERR_SUCCESS)
{
WriteLog(LogType_ErrorOut, LogLevel_Error, "Socket MS_RESPONSE_TIMEOUT_KEY configuration not found!");
return SOCK_ERR_CONFIG;
}
lane_log("Init 2\n");
if (SafeStringToInt(szValueTemp, sizeof(szValueTemp), &g_nResponseTimeout) != CONF_ERR_SUCCESS)
{
WriteLog(LogType_ErrorOut, LogLevel_Error, "Socket MS_RESPONSE_TIMEOUT_KEY configuration error[ConfigFile:%s]!", szValueTemp);
return SOCK_ERR_CONFIG;
}
lane_log("Init 3\n");
// Read CONNECTION_TYPE
if (GetConfigValue(SOCK_CONF_CONN_TYPE_KEY, szValueTemp, sizeof(szValueTemp)) != CONF_ERR_SUCCESS)
{
WriteLog(LogType_ErrorOut, LogLevel_Error, "Socket CONNECTION_TYPE configuration not found[Available Value:Server or Client]!");
return SOCK_ERR_CONFIG;
}
lane_log("Init 4\n");
if (MappingStringToInt(szValueTemp, sizeof(szValueTemp), g_sConnTypeInfo, sizeof(g_sConnTypeInfo)/sizeof(struct VALUE_STRING_PAIR_INFO), &g_eConnectionType) != CONF_ERR_SUCCESS)
{
WriteLog(LogType_ErrorOut, LogLevel_Error, "Socket Connection Type configuration error[ConfigFile:%s,Available Value:Server or Client]!", szValueTemp);
return SOCK_ERR_CONFIG;
}
lane_log("Init 5\n");
if (InitLogConfig() != LOG_ERR_SUCCESS)
{
return MS_ERR_CONFIG;
}
lane_log("Init 6\n");
if (InitSenderMQConfig() != MQ_ERR_SUCCESS)
{
return MS_ERR_CONFIG;
}
lane_log("Init 7\n");
if (InitSender2MQConfig() != MQ_ERR_SUCCESS)
{
return MS_ERR_CONFIG;
}
lane_log("Init 8\n");
if (InitThreadConfig() != THREAD_ERR_SUCCESS)
{
return MS_ERR_CONFIG;
}
lane_log("Init 9\n");
return MS_ERR_SUCCESS;
}
void *ProcThread(void *pData)
{
char cOccurType;
struct THREAD_INFO *pThreadInfo;
struct THREAD_INFO SendThreadInfo;
char szMessageBuffer[MAX_STRING_SIZE + 1];
struct MESSAGE_INFO sMessageInfo;
struct MESSAGE_INFO sndMessageInfo;
int nResult;
int nRslt = 0;
int *pnThreadExitCode;
char szTopic[MAX_STRING_SIZE + 1];
char szTargetTopic[MAX_STRING_SIZE + 1];
char szReplyQueuename[MAX_STRING_SIZE + 1];
char szReply2Queuename[MAX_STRING_SIZE + 1];
char szExchangername[MAX_STRING_SIZE + 1];
uuid_t sUUIDTemp;
char szSendUUIDString[UUID_STRING_SIZE + 1];
char szSend2UUIDString[UUID_STRING_SIZE + 1];
char szRecvUUIDString[UUID_STRING_SIZE + 1];
char szRecv2UUIDString[UUID_STRING_SIZE + 1];
int nMessageLength;
//int nPacketLength;
int nSerialNo;
int nCompareResult;
//time_t sNow;
struct tm tNow;
int nMillisecond;
//struct timespec sRecvTimeSpec;
//time_t sRecvNow;
struct tm tRecvNow;
int nRecvMillisecond;
struct timeval sResponseTimeout;
uint64_t nDeliveryTag;
int nDiffSec;
int nDiffMilliSec;
long nDiffTime;
bool bMessageMatch;
bool bTimeout;
struct rcv_lane_data *rd;
rd = szData;
pThreadInfo = (struct THREAD_INFO *)pData;
SafeMemSet(&SendThreadInfo, sizeof(SendThreadInfo), 0, sizeof(SendThreadInfo));
pthread_mutex_init(&SendThreadInfo.sContextMutex, NULL);
pthread_mutex_init(&SendThreadInfo.sPublishMQ.sMQMutex, NULL);
pthread_mutex_init(&SendThreadInfo.sConsumeMQ.sMQMutex, NULL);
pnThreadExitCode = NULL;
nResult = SafeMemoryAlloc((void **)&pnThreadExitCode, sizeof(int));
SendThreadInfo.nThreadIndex = pThreadInfo->nThreadIndex;
SendThreadInfo.bThreadLoop = true;
//pnThreadExitCode = SafeAlloc(pnThreadExitCode, sizeof(int));
*pnThreadExitCode = 0;
//nSerialNo = 1;
lane_log("수신 Buf[%s]\n", rd->buf);
if (g_eConnectionType == SOCK_CONN_TYPE_SERVER)
{
cOccurType = 'S';
}
else
{
cOccurType = 'C';
}
/*
while (pThreadInfo->bThreadLoop)
{
*/
if (pThreadInfo->sPublishMQ.bMQConnection == false || pThreadInfo->sPublishMQ.bMQChannelOpen == false
|| pThreadInfo->sConsumeMQ.bMQConnection == false || pThreadInfo->sConsumeMQ.bMQChannelOpen == false)
{
#if 1
if (ConnectSenderMQ(pThreadInfo, true) != MQ_ERR_SUCCESS)
#else
if(ConnectSenderMultiMQ(pThreadInfo, true, type, 0) != MQ_ERR_SUCCESS)
#endif
{
WriteLog(LogType_StandardOut, LogLevel_Debug, "[Thread:%02d]Connect SenderMQ is failed!", pThreadInfo->nThreadIndex);
#if 0
continue;
#else
return pnThreadExitCode;
#endif
}
WriteLog(LogType_StandardOut, LogLevel_Debug, "[Thread:%02d]Connect SenderMQ is succeeded!!", pThreadInfo->nThreadIndex);
lane_log("Type [%d]\n", type);
#if 1
switch(type)
{
case 0:
case 3:
snprintf(szReplyQueuename, sizeof(szReplyQueuename), "%s%s.%02d", g_szMQMessageSendRespQueuename, szOrganizationID, pThreadInfo->nThreadIndex);
break;
case 1:
case 2:
snprintf(szReplyQueuename, sizeof(szReplyQueuename), "%s%s.%02d", g_szMQMessageSendRealQueuename, szOrganizationID, pThreadInfo->nThreadIndex);
break;
}
#else
snprintf(szReplyQueuename, sizeof(szReplyQueuename), "%s%s.%02d", g_szMQMessageSendRespQueuename, szOrganizationID, pThreadInfo->nThreadIndex);
#endif
}
if (SendThreadInfo.sPublishMQ.bMQConnection == false || SendThreadInfo.sPublishMQ.bMQChannelOpen == false
|| SendThreadInfo.sConsumeMQ.bMQConnection == false || SendThreadInfo.sConsumeMQ.bMQChannelOpen == false)
{
if(ConnectSenderMultiMQ(&SendThreadInfo, true, type, 1) != MQ_ERR_SUCCESS)
{
WriteLog(LogType_StandardOut, LogLevel_Debug, "[Thread:%02d]Connect SenderMQ is failed!", SendThreadInfo.nThreadIndex);
#if 0
continue;
#else
return pnThreadExitCode;
#endif
}
snprintf(szReply2Queuename, sizeof(szReply2Queuename), "%s%s.%02d", g_szMQMessageSendHqctQueuename, g_szMQTargetOrganizationCode, SendThreadInfo.nThreadIndex);
}
lane_log("Reply Que 1 [%s]\n", szReplyQueuename);
GetCurrTime(&tNow, &nMillisecond);
nSerialNo = GetSerialNo();
lane_log("Serial Que 2 [%d]\n", nSerialNo);
snprintf(szMessageBuffer, sizeof(szMessageBuffer), "%s%c%s%s%08d%04d%02d%02d%02d%02d%02d%s%s", szOrganizationID, cOccurType, szWorkType,
szMsgType, nSerialNo++, tNow.tm_year + 1900, tNow.tm_mon + 1, tNow.tm_mday, tNow.tm_hour, tNow.tm_min, tNow.tm_sec, szRespCode, szData);
nMessageLength = strnlen_s(szMessageBuffer, sizeof(szMessageBuffer));
sMessageInfo.nMessageLength = nMessageLength;
sMessageInfo.paMessageData = NULL;
SafeMemoryAlloc((void **)&sMessageInfo.paMessageData, sMessageInfo.nMessageLength + 5);
snprintf(sMessageInfo.paMessageData, sMessageInfo.nMessageLength + 5, "%04X", sMessageInfo.nMessageLength);
SafeStrCat(sMessageInfo.paMessageData + 4, sMessageInfo.nMessageLength + 1, szMessageBuffer, strnlen_s(szMessageBuffer, sizeof(szMessageBuffer)));
snprintf(szTopic, sizeof(szTopic), "%s.%s%s", szOrganizationID, szWorkType, szMsgType);
#ifdef _WIN32
uuid_generate(&sUUIDTemp);
uuid_unparse(&sUUIDTemp, szSendUUIDString);
#else
uuid_generate(sUUIDTemp);
uuid_unparse(sUUIDTemp, szSendUUIDString);
#endif
lane_log("TEST 2 Exchange[%s]\n", g_szMQMessageSendExchangername);
lane_log("TEST 3 Topic[%s]\n", szTopic);
lane_log("TEST 4 Reply[%s][%d]\n", szReplyQueuename, pThreadInfo->sPublishMQ.bMQChannelOpen);
nResult = PutMessageToQueue(pThreadInfo->sPublishMQ.sMQConnectionState, &pThreadInfo->sPublishMQ.bMQConnection, &pThreadInfo->sPublishMQ.bMQChannelOpen, pThreadInfo->nThreadIndex,
g_szMQMessageSendExchangername, szReplyQueuename, szTopic, &sMessageInfo, sMessageInfo.paMessageData, sMessageInfo.nMessageLength,
szSendUUIDString, sizeof(szSendUUIDString), false, NULL);
printf("SendData[%d] : [Thread:%02d]%s\n", nResult, pThreadInfo->nThreadIndex, sMessageInfo.paMessageData);
if (nResult != MS_ERR_SUCCESS)
{
// 메시지 전송 오류인 경우 처리
// 오류인 경우 오류 로그에 추가하는 형태의 샘플이지만 DB 처리라면 재전송 여부를 결정해서 재전송 몇번 이내면 재전송 하고 그렇지 않으면 실패 처리하는 로직이 필요하다.
WriteLog(LogType_StandardOut, LogLevel_Info, "[Thread:%02d]MQ Data Publish Error[Data:%s]!", pThreadInfo->nThreadIndex, sMessageInfo.paMessageData);
}
else
{
WriteQueueLog(LogType_MQOut, LogLevel_Info, pThreadInfo->nThreadIndex, 'P', g_szMQMessageSendExchangername, szTopic, szSendUUIDString, sMessageInfo.nMessageLength, "%s",
sMessageInfo.paMessageData);
WriteLog(LogType_StandardOut, LogLevel_Info, "[Thread:%02d]MQ Data Publish[Data:%s]!", pThreadInfo->nThreadIndex, sMessageInfo.paMessageData);
bMessageMatch = false;
bTimeout = false;
if (g_bBypassMode)
{
do
{
GetCurrTime(&tRecvNow, &nRecvMillisecond);
nDiffTime = TimeDiff(&tNow, nMillisecond, &tRecvNow, nRecvMillisecond);
nDiffSec = nDiffTime / 1000;
nDiffMilliSec = nDiffTime % 1000;
if (nDiffMilliSec < 0)
{
nDiffSec--;
nDiffMilliSec += 1000;
}
sResponseTimeout.tv_sec = g_nResponseTimeout / 1000 - nDiffSec;
sResponseTimeout.tv_usec = (g_nResponseTimeout % 1000 - nDiffMilliSec) * 1000;
if (sResponseTimeout.tv_usec < 0)
{
sResponseTimeout.tv_sec--;
sResponseTimeout.tv_usec += 1000000;
}
if (sResponseTimeout.tv_sec < 0 || sResponseTimeout.tv_usec < 0)
{
WriteLog(LogType_StandardOut, LogLevel_Info, "[Thread:%02d]Response Timeout! - UUID:%s, Message:%s, ProcessTime:%d.%03ds\n", pThreadInfo->nThreadIndex, szSendUUIDString,
sMessageInfo.paMessageData, nDiffSec, nDiffMilliSec);
bTimeout = true;
break;
}
memset(rtnData, 0x00, sizeof(rtnData));
nResult = GetMessageFromQueue(pThreadInfo->sConsumeMQ.sMQConnectionState, szExchangername, sizeof(szExchangername), szMessageBuffer, sizeof(szMessageBuffer),
szRecvUUIDString, sizeof(szRecvUUIDString), szReplyQueuename, sizeof(szReplyQueuename), &pThreadInfo->sConsumeMQ.bMQConnection,
&pThreadInfo->sConsumeMQ.bMQChannelOpen, &nDeliveryTag, &sResponseTimeout);
//lane_log("Get Msg[%s][%d]\n", szMessageBuffer, nResult);
nResult = pthread_tryjoin_np(pThreadInfo->nThreadID, NULL);
if(nResult == '0')
{
RemoveTerminatedThread();
nResult = pthread_kill(pThreadInfo, SIGINT);
pthread_mutex_unlock(&pThreadInfo->sContextMutex);
//nResult = DoubleLinkedListRemoveCurrentNode(&g_sConnectedSocketThreadInfo, NULL);
SafeFree((void **)&pThreadInfo);
//SafeFree((void **)&pListNode);
}
snprintf(rtnData, strnlen_s(szMessageBuffer+45, sizeof(szMessageBuffer)-45)+1, "%s", szMessageBuffer+45);
lane_log("Rtn Data[%s]\n", rtnData);
GetCurrTime(&tRecvNow, &nRecvMillisecond);
nDiffTime = TimeDiff(&tNow, nMillisecond, &tRecvNow, nRecvMillisecond);
nDiffSec = nDiffTime / 1000;
nDiffMilliSec = nDiffTime % 1000;
if (nDiffMilliSec < 0)
{
nDiffSec--;
nDiffMilliSec += 1000;
}
if (nResult == MQ_ERR_TIMEOUT)
{
//printf("Response Timeout! - UUID:%s, Message:%s, ProcessTime:%d.%03ds\n", );
WriteLog(LogType_StandardOut, LogLevel_Info, "[Thread:%02d]Response Timeout! - UUID:%s, Message:%s, ProcessTime:%d.%03ds\n",
pThreadInfo->nThreadIndex, szSendUUIDString, sMessageInfo.paMessageData, nDiffSec, nDiffMilliSec);
bTimeout = true;
break;
}
printf("TEST11[%d][%d][%s]\n", type, nResult, rtnData);
nResult = SafeStrCmp(szRecvUUIDString, sizeof(szRecvUUIDString), szSendUUIDString, &nCompareResult);
if (nCompareResult != 0)
{
WriteLog(LogType_StandardOut, LogLevel_Info, "[Thread:%02d]Recv UUID is not matched! - UUID:[Send:%s, Recv:%s], Message:%s, ProcessTime:%d.%03ds\n", pThreadInfo->nThreadIndex, szSendUUIDString, szRecvUUIDString, sMessageInfo.paMessageData, nDiffSec, nDiffMilliSec);
amqp_basic_ack(pThreadInfo->sConsumeMQ.sMQConnectionState, 1, nDeliveryTag, 0);
continue;
}
bMessageMatch = true;
nMessageLength = strnlen_s(szMessageBuffer, sizeof(szMessageBuffer));
amqp_basic_ack(pThreadInfo->sConsumeMQ.sMQConnectionState, 1, nDeliveryTag, 0);
WriteQueueLog(LogType_MQOut, LogLevel_Info, pThreadInfo->nThreadIndex, 'C', szReplyQueuename, "", szRecvUUIDString, nMessageLength, "%s", szMessageBuffer);
WriteLog(LogType_StandardOut, LogLevel_Info, "[Thread:%02d]MQ Data Consume[Data:%s] - ProcessTime:%d.%03ds!", pThreadInfo->nThreadIndex, szMessageBuffer, nDiffSec, nDiffMilliSec);
} while (bMessageMatch == false && bTimeout == false);
if (!bTimeout)
{
printf("센터 전송 옴[%d]?\n", type);
// 전송 완료 처리 필요
}else
{
memset(rtnData, 0x00, sizeof(rtnData));
snprintf(rtnData, sizeof(rtnData), "%02x0002", cmd);
}
}
}
//sleep(1); // test
/*
}
*/
CloseMQ(pThreadInfo->nThreadIndex, pThreadInfo->sPublishMQ.sMQConnectionState, &pThreadInfo->sPublishMQ.bMQConnection, &pThreadInfo->sPublishMQ.bMQChannelOpen, &pThreadInfo->sPublishMQ.sMQMutex);
CloseMQ(pThreadInfo->nThreadIndex, pThreadInfo->sConsumeMQ.sMQConnectionState, &pThreadInfo->sConsumeMQ.bMQConnection, &pThreadInfo->sConsumeMQ.bMQChannelOpen, &pThreadInfo->sConsumeMQ.sMQMutex);
return pnThreadExitCode;
}
int GetSerialNo()
{
int nSerialNo;
pthread_mutex_lock(&g_sSerialMutex);
nSerialNo = g_nSerailNo++;
pthread_mutex_unlock(&g_sSerialMutex);
return nSerialNo;
}

@ -1,702 +0,0 @@
#include "MessageSender.h"
#include "hi_ctrl.h"
#include "taglnk_frame.h"
#include "tagbl_struc.h"
#include <iconv.h>
#include "iss_comp.h"
char g_szProcessname[MAX_STRING_SIZE + 1];
char g_szProcessPath[PATH_MAX + 1];
extern int g_nResponseTimeout;
extern int cmd;
extern int type;
extern int job;
extern char szOrganizationID[6+1];
extern char szWorkType[4+1];
extern char szMsgType[4+1];
extern char szRespCode[] = "0000";
//char *szData;
char rcvData[MAX_STRING_SIZE + 1];
char szData[MAX_STRING_SIZE + 1];
char rtnData[MAX_STRING_SIZE + 1];
extern int g_nSerailNo = 1;
pthread_mutex_t g_sSerialMutex = PTHREAD_MUTEX_INITIALIZER;
static struct {
int cmd; /* CONTROL COMMAND */
int act; /* 0:송신 1:응답 2:3자통신 */
int work;
char *cmd_nm;
} lane_act[] = {
{ NS_RPR_DATA , 0, 0 , " 0x76 보수데이터 "},
{ NS_CLOSE_PASS , 0, 5 , " 0x73 야간계수 데이터 "},
{ NS_WRK_START , 0, 0 , " 0x61 근무개시보고 "},
{ NS_HND_DATA , 0, 2 , " 0x17 처리 데이터 "},
{ NS_CRD_HND_DATA , 0, 3 , " 0x50 카드처리 데이터 "},
{ NS_WRK_END , 0, 4 , " 0x28 근무종료데이터 "},
//{ NS_REMOTE_LANE_STAT_REPLY, 2, 0 , " 0xD4 원격차로상태응답 "},
{ NS_TBL_ORDER , 3, 0 , " 0x24 테이블요구(백그라운드용) "},
{ NS_REFUND_DATA , 0, 2 , " 0x40 환불처리 데이터 "},
{ NS_LANE_STAT , 1, 1 , " 0x75 차로상태(STATUS) "},
{ -1 , -1, -1 , " "}
};
static struct {
uchar plz_id[2];
int lane_no;
int tbl_order;
} lane_info[20] =
{
{"01", 1, 0},
{"01", 2, 0},
{"01", 3, 0},
{"01", 4, 0},
{"01", 5, 0},
{"01", 6, 0},
{"01", 7, 0},
{"01", 8, 0},
{"03", 1, 0},
{"03", 2, 0},
{"03", 3, 0},
{"03", 4, 0},
{"03", 5, 0},
{"03", 6, 0},
{"03", 7, 0},
{"03", 8, 0},
{"03", 9, 0},
{"03", 10, 0},
{"00", -1, -1}
};
int MessageSender(int kind, char *packet, char *path, char *rtn)
{
char szProcessname[PATH_MAX + 1];
char *sDataMalloc;
struct write_info wi;
struct rcv_lane_data *rd;
struct bl_struct *sd;
int nResult;
int i = 0;
//DaemonStart();
/* //2019.07.04 InitMessageQueue에 포함
nResult = SafeStrCopy(szProcessname, sizeof(szProcessname), path);
if (nResult != SEC_ERR_SUCCESS)
{
printf("strcpy_s Error![%d]\n", nResult);
}
pFindPointer = szProcessname;
nResult = SafeStrrChr(szProcessname, strnlen_s(szProcessname, sizeof(szProcessname)), PATH_SEPERATOR, &pFindPointer);
if (pFindPointer == NULL)
{
nResult = SafeStrCopy(g_szProcessname, sizeof(g_szProcessname), szProcessname);
}
else
{
nResult = SafeStrCopy(g_szProcessname, sizeof(g_szProcessname), pFindPointer + 1);
}
GetExecPath(g_szProcessPath, sizeof(g_szProcessPath) - 1, path);
if (g_szProcessPath == NULL)
{
printf("Can't get process path[Cann't load config file]!\nExit Program!");
return -1;
}
*/
lane_log("MessageSender Start\n");
/* // 2019.07.04 lane_main에서 1번만 실행하도록 수정
nResult = InitMessageSenderConfig(path);
if (nResult != MQ_ERR_SUCCESS)
{
printf("Program terminated!!!\n");
return -1;
}
*/
//lane_log("---------------------------[%s]-----------------------\n", packet);
//WriteLog(LogType_SystemOut,LogLevel_Info, "Process Started![%s]", g_szProcessname);
memset(szData, 0x00, sizeof(szData));
memset(rcvData, 0x00, sizeof(rcvData));
memset(rtnData, 0x00, sizeof(rtnData));
sprintf(wi.plz_id, "%.2s", PLZ_CODE2);
nResult = lb_freadx(&wi, FNO_BASICTBL, &hi_comm_ctrl.bas_tbl, FFIRST);
if (nResult < 0){
err_log("기초사항-F을 읽을 수 없습니다.", 1);
}
//lane_log("plz_id[%.2s]\n", hi_comm_ctrl.bas_tbl.plz_id);
while(1)
{
if(lane_act[i].cmd == kind || lane_act[i].cmd == -1) break;
i++;
}
cmd = lane_act[i].cmd;
lane_log("lane_act[%d].cmd[%02x][%d]\n", i, lane_act[i].cmd, lane_act[i].act);
snprintf(szOrganizationID, 6+1, "%06d", lb_data2atoi(hi_comm_ctrl.bas_tbl.plz_id, 2));
//lane_log("Packet[%s]\n", packet);
//sDataMalloc = szData;
//szData = (char *)malloc(MAX_STRING_SIZE +1);
//SafeMemoryAlloc((void **)&sDataMalloc, MAX_STRING_SIZE+ 1);
if(lane_act[i].act == 3)
{
sd = (struct bl_struct *)packet;
snprintf(szData, sizeof(struct bl_struct)+1, "%s", (char *)sd);
snprintf(rcvData, sizeof(struct bl_struct)+1, "%s", (char *)sd);
//lane_log("3 szData[%.2s] [%.2s]\n", szData, sd->plz_id);
}else if(lane_act[i].act == 1)
{
sprintf(szData, "%s", packet);
sprintf(rcvData, "%s", packet);
//lane_log("1 szData[%s]\n", szData);
}else{
rd = (struct rcv_lane_data *)packet;
snprintf(szData, sizeof(struct rcv_lane_data)+1, "%s", (char *)rd);
snprintf(rcvData, sizeof(struct rcv_lane_data)+1, "%s", (char *)rd);
//lane_log("else szData[%.2s] [%.2s]\n", szData, rd->plz_id);
}
if(lane_act[i].act == 2)
snprintf(szWorkType, 4+1, "%.4s", "9998");
else if(lane_act[i].act == 1)
snprintf(szWorkType, 4+1, "%04d", lane_act[i].work);
else if(lane_act[i].act == 0)
snprintf(szWorkType, 4+1, "%.4s", "9999");
else if(lane_act[i].act == 3)
snprintf(szWorkType, 4+1, "%.2s%.2s", sd->plz_id, sd->lane_no);
else{
//SafeFree((void **)&sDataMalloc);
return -1;
}
type = lane_act[i].act;
job = lane_act[i].work;
lane_log("szWorkType[%s] [%s]\n", szWorkType, lane_act[i].cmd_nm);
snprintf(szMsgType , 4+1, "%.4s", "0010");
//lane_log("처리할 자료[%s]\n", szData);
//2019.06.27 InitDoubleLinkedList Main 선언
InitThread_Before();
lane_log("Queue 초기화\n");
// if(type == 0)
nResult = InitThread(ProcThread);
// else
// nResult = InitThreadWait(ProcThread);
lane_log("종료[%d]?[%s][%d]\n", type, rtnData, nResult);
sprintf(rtn, "%s", rtnData);
//SafeFree((void **)&sDataMalloc);
//WriteLog(LogType_SystemOut,LogLevel_Info, "Process Stopped![%s]", g_szProcessname);
return 0;
}
int InitMessageSenderConfig(char *path)
{
char szConfigfilename[PATH_MAX + 1];
char szConfigfilenameTemp[PATH_MAX + 1];
char szValueTemp[MAX_STRING_SIZE + 1];
char szProcessname[PATH_MAX + 1];
char *pFindPointer;
int nResult;
nResult = SafeStrCopy(szProcessname, sizeof(szProcessname), path);
if (nResult != SEC_ERR_SUCCESS)
{
printf("strcpy_s Error![%d]\n", nResult);
}
pFindPointer = szProcessname;
nResult = SafeStrrChr(szProcessname, strnlen_s(szProcessname, sizeof(szProcessname)), PATH_SEPERATOR, &pFindPointer);
if (pFindPointer == NULL)
{
nResult = SafeStrCopy(g_szProcessname, sizeof(g_szProcessname), szProcessname);
}
else
{
nResult = SafeStrCopy(g_szProcessname, sizeof(g_szProcessname), pFindPointer + 1);
}
GetExecPath(g_szProcessPath, sizeof(g_szProcessPath) - 1, path);
if (g_szProcessPath == NULL)
{
printf("Can't get process path[Cann't load config file]!\nExit Program!");
return -1;
}
snprintf(szConfigfilenameTemp, sizeof(szConfigfilenameTemp), "%s%c%s%c", g_szProcessPath, PATH_SEPERATOR, CONF_FILE_PATH, PATH_SEPERATOR);
realpath(szConfigfilenameTemp, szConfigfilename);
snprintf(szValueTemp, sizeof(szValueTemp), "%c", PATH_SEPERATOR);
nResult = SafeStrCat(szConfigfilename, sizeof(szConfigfilename), szValueTemp, 1);
nResult = SafeStrCat(szConfigfilename, sizeof(szConfigfilename), g_szProcessname, strnlen_s(g_szProcessname, sizeof(g_szProcessname)));
nResult = SafeStrCat(szConfigfilename, sizeof(szConfigfilename), CONF_FILE_EXT, strnlen_s(CONF_FILE_EXT, sizeof(CONF_FILE_EXT)));
printf("Process Name : %s\n", g_szProcessname);
printf("Process Path : %s\n", g_szProcessPath);
printf("Config Filename : %s[%d]\n", szConfigfilename, nResult);
ReadConfigFile(true, szConfigfilename);
// Read MS_RESPONSE_TIMEOUT_KEY
if (GetConfigValue(MS_RESPONSE_TIMEOUT_KEY, szValueTemp, sizeof(szValueTemp)) != CONF_ERR_SUCCESS)
{
WriteLog(LogType_ErrorOut, LogLevel_Error, "Socket MS_RESPONSE_TIMEOUT_KEY configuration not found!");
return SOCK_ERR_CONFIG;
}
if (SafeStringToInt(szValueTemp, sizeof(szValueTemp), &g_nResponseTimeout) != CONF_ERR_SUCCESS)
{
WriteLog(LogType_ErrorOut, LogLevel_Error, "Socket MS_RESPONSE_TIMEOUT_KEY configuration error[ConfigFile:%s]!", szValueTemp);
return SOCK_ERR_CONFIG;
}
// Read CONNECTION_TYPE
if (GetConfigValue(SOCK_CONF_CONN_TYPE_KEY, szValueTemp, sizeof(szValueTemp)) != CONF_ERR_SUCCESS)
{
WriteLog(LogType_ErrorOut, LogLevel_Error, "Socket CONNECTION_TYPE configuration not found[Available Value:Server or Client]!");
return SOCK_ERR_CONFIG;
}
if (MappingStringToInt(szValueTemp, sizeof(szValueTemp), g_sConnTypeInfo, sizeof(g_sConnTypeInfo)/sizeof(struct VALUE_STRING_PAIR_INFO), &g_eConnectionType) != CONF_ERR_SUCCESS)
{
WriteLog(LogType_ErrorOut, LogLevel_Error, "Socket Connection Type configuration error[ConfigFile:%s,Available Value:Server or Client]!", szValueTemp);
return SOCK_ERR_CONFIG;
}
if (InitLogConfig() != LOG_ERR_SUCCESS)
{
return MS_ERR_CONFIG;
}
if (InitSenderMQConfig() != MQ_ERR_SUCCESS)
{
return MS_ERR_CONFIG;
}
if (InitSender2MQConfig() != MQ_ERR_SUCCESS)
{
return MS_ERR_CONFIG;
}
if (InitThreadConfig() != THREAD_ERR_SUCCESS)
{
return MS_ERR_CONFIG;
}
return MS_ERR_SUCCESS;
}
void *ProcThread(void *pData)
{
char cOccurType;
struct THREAD_INFO *pThreadInfo;
struct THREAD_INFO SendThreadInfo;
char szMessageBuffer[MAX_STRING_SIZE + 1];
struct MESSAGE_INFO sMessageInfo;
struct MESSAGE_INFO sndMessageInfo;
int nResult;
int nRslt = 0, retry = 0;
int *pnThreadExitCode;
amqp_queue_purge_ok_t *pQueuePurgeResult;
char szTopic[MAX_STRING_SIZE + 1];
char szTargetTopic[MAX_STRING_SIZE + 1];
char szReplyQueuename[MAX_STRING_SIZE + 1];
char szReply2Queuename[MAX_STRING_SIZE + 1];
char szExchangername[MAX_STRING_SIZE + 1];
uuid_t sUUIDTemp;
char szSendUUIDString[UUID_STRING_SIZE + 1];
char szSend2UUIDString[UUID_STRING_SIZE + 1];
char szRecvUUIDString[UUID_STRING_SIZE + 1];
char szRecv2UUIDString[UUID_STRING_SIZE + 1];
char orgCarNo[255+1];
char szCarNo[255+1];
char putCarNp[16];
char nowTbl[2+1];
char procData[MAX_STRING_SIZE + 1]; memset(procData, 0x00, sizeof(procData));
char webData[MAX_STRING_SIZE + 1]; memset(webData, 0x00, sizeof(webData));
int nMessageLength;
//int nPacketLength;
int nSerialNo;
int nCompareResult, size, ii;
//time_t sNow;
struct tm tNow;
int nMillisecond;
//struct timespec sRecvTimeSpec;
//time_t sRecvNow;
struct tm tRecvNow;
int nRecvMillisecond;
struct timeval sResponseTimeout;
uint64_t nDeliveryTag;
int nDiffSec;
int nDiffMilliSec;
long nDiffTime;
bool bMessageMatch;
bool bTimeout;
struct hi_lane_stat *hs;
struct hi_wrk_end *we;
struct hi_wrk_start *ws;
struct hi_crd_hnd_data *crd;
struct hi_hnd_data *hnd;
size_t in_size;
size_t out_size;
size_t out_buf_left;
char* in_buf;
char* out_buf;
char in_fare[2];
char in_booth[2];
char tmp_chg[20];
#if 1
FILE *fp;
char path[128];
int flen = 0;
memset(path, 0x00, sizeof(path));
#endif
//lane_log("##### Rcv Data[%s] ##### \n", rcvData);
memcpy(procData, szData, sizeof(procData));
//memcpy(nowTbl, tbl_order, sizeof(nowTbl));
memset(orgCarNo, 0x00, sizeof(orgCarNo));
memset(szCarNo, 0x00, sizeof(szCarNo));
lane_log("MessageSender 1\n");
pThreadInfo = (struct THREAD_INFO *)pData;
SafeMemSet(&SendThreadInfo, sizeof(SendThreadInfo), 0, sizeof(SendThreadInfo));
pthread_mutex_init(&SendThreadInfo.sContextMutex, NULL);
pthread_mutex_init(&SendThreadInfo.sPublishMQ.sMQMutex, NULL);
pthread_mutex_init(&SendThreadInfo.sConsumeMQ.sMQMutex, NULL);
pnThreadExitCode = NULL;
nResult = SafeMemoryAlloc((void **)&pnThreadExitCode, sizeof(int));
SendThreadInfo.nThreadIndex = pThreadInfo->nThreadIndex;
SendThreadInfo.bThreadLoop = true;
//pnThreadExitCode = SafeAlloc(pnThreadExitCode, sizeof(int));
*pnThreadExitCode = 0;
//nSerialNo = 1;
lane_log("MessageSender 2\n");
if (g_eConnectionType == SOCK_CONN_TYPE_SERVER)
{
cOccurType = 'S';
}
else
{
cOccurType = 'C';
}
/*
while (pThreadInfo->bThreadLoop)
{
*/
lane_log("MessageSender 3\n");
if (pThreadInfo->sPublishMQ.bMQConnection == false || pThreadInfo->sPublishMQ.bMQChannelOpen == false
|| pThreadInfo->sConsumeMQ.bMQConnection == false || pThreadInfo->sConsumeMQ.bMQChannelOpen == false)
{
#if 0
while(1)
{
#endif
lane_log("[Thread:%02d]Connect SenderMQ Connecting [%d]\n", pThreadInfo->nThreadIndex, retry);
if (ConnectSenderMQ(pThreadInfo, true) != MQ_ERR_SUCCESS)
{
lane_log("[Thread:%02d]Connect SenderMQ is failed!\n", pThreadInfo->nThreadIndex);
#if 1
sprintf(path, "/app_data/tcs/FILE/RECV/Retry_%04d%02d%02d%02d%02d%02d", tNow.tm_year + 1900, tNow.tm_mon + 1, tNow.tm_mday, tNow.tm_hour, tNow.tm_min, tNow.tm_sec);
unlink(path);
fp = fopen(path, "r+");
if(fp < 0)
{
lane_log("%s:파일생성 실패[%d]", path, errno);
WriteLog(LogType_StandardOut, LogLevel_Info, "[Thread:%02d]Connect Fail & File Create Fail! - Message:[%s]\n",
pThreadInfo->nThreadIndex, procData);
return(-1);
}
flen = strlen(procData);
procData[flen] = '\n';
procData[flen+1] = '\0';
nRslt = fputs(procData, fp);
fclose(fp);
#endif
pThreadInfo->nRunState = SOCK_THREAD_STATE_TERMINATED;
pthread_mutex_destroy(&pThreadInfo->sContextMutex);
pthread_mutex_destroy(&pThreadInfo->sPublishMQ.sMQMutex);
pthread_mutex_destroy(&pThreadInfo->sConsumeMQ.sMQMutex);
SafeFree((void **)&pThreadInfo);
SafeFree((void **)&pnThreadExitCode);
#if 0
pThreadInfo = (struct THREAD_INFO *)pData;
pnThreadExitCode = NULL;
nResult = SafeMemoryAlloc((void **)&pnThreadExitCode, sizeof(int));
SendThreadInfo.nThreadIndex = pThreadInfo->nThreadIndex;
continue;
#else
lane_log("[Thread:%02d]Connect SenderMQ File Create[%d]!\n", pThreadInfo->nThreadIndex, flen);
return(-1);
#endif
}else
{
lane_log("[Thread:%02d]Connect SenderMQ Reconnecting is Success!\n", pThreadInfo->nThreadIndex);
//break;
}
#if 0
}
#endif
//WriteLog(LogType_StandardOut, LogLevel_Debug, "[Thread:%02d]Connect SenderMQ is succeeded!!", pThreadInfo->nThreadIndex);
lane_log("Type [%d]\n", type);
#if 1
switch(type)
{
case 0:
case 2:
snprintf(szReplyQueuename, sizeof(szReplyQueuename), "%s%s.%02d", g_szMQMessageSendRespQueuename, szOrganizationID, pThreadInfo->nThreadIndex);
break;
case 3:
snprintf(szReplyQueuename, sizeof(szReplyQueuename), "%s%s.%02d", g_szMQMessageSendHqctQueuename, szOrganizationID, pThreadInfo->nThreadIndex);
break;
case 1:
snprintf(szReplyQueuename, sizeof(szReplyQueuename), "%s%s.%02d", g_szMQMessageSendRealQueuename, szOrganizationID, pThreadInfo->nThreadIndex);
//snprintf(szReplyQueuename, sizeof(szReplyQueuename), "%s%s.%02d", g_szMQMessageSendHqctQueuename, szOrganizationID, pThreadInfo->nThreadIndex);
break;
}
#else
snprintf(szReplyQueuename, sizeof(szReplyQueuename), "%s%s.%02d", g_szMQMessageSendRespQueuename, szOrganizationID, pThreadInfo->nThreadIndex);
#endif
}
lane_log("Reply Que 1 [%s]\n", szReplyQueuename);
//sleep(4);
GetCurrTime(&tNow, &nMillisecond);
nSerialNo = GetSerialNo();
lane_log("Serial Que 2 [%d]\n", nSerialNo);
//sleep(4);
snprintf(szMessageBuffer, sizeof(szMessageBuffer), "%s%c%s%s%08d%04d%02d%02d%02d%02d%02d%s%s", szOrganizationID, cOccurType, szWorkType,
szMsgType, nSerialNo++, tNow.tm_year + 1900, tNow.tm_mon + 1, tNow.tm_mday, tNow.tm_hour, tNow.tm_min, tNow.tm_sec, szRespCode, procData);
//free(szData);
nMessageLength = strnlen_s(szMessageBuffer, sizeof(szMessageBuffer));
sMessageInfo.nMessageLength = nMessageLength;
sMessageInfo.paMessageData = NULL;
lane_log("Aloc Before\n");
//sleep(4);
SafeMemoryAlloc((void **)&sMessageInfo.paMessageData, sMessageInfo.nMessageLength + 5);
lane_log("Aloc After\n");
//sleep(4);
snprintf(sMessageInfo.paMessageData, sMessageInfo.nMessageLength + 5, "%04X", sMessageInfo.nMessageLength);
SafeStrCat(sMessageInfo.paMessageData + 4, sMessageInfo.nMessageLength + 1, szMessageBuffer, strnlen_s(szMessageBuffer, sizeof(szMessageBuffer)));
lane_log("MessageSender 5\n");
//sleep(4);
snprintf(szTopic, sizeof(szTopic), "%s.%s%s", szOrganizationID, szWorkType, szMsgType);
#ifdef _WIN32
uuid_generate(&sUUIDTemp);
uuid_unparse(&sUUIDTemp, szSendUUIDString);
#else
//lane_log("A2\n");
//sleep(4);
uuid_generate(sUUIDTemp);
//lane_log("A3\n");
//sleep(4);
uuid_unparse(sUUIDTemp, szSendUUIDString);
#endif
lane_log("TEST 2 Exchange[%s]\n", g_szMQMessageSendExchangername);
lane_log("TEST 3 Topic[%s]\n", szTopic);
//lane_log("TEST 4 Reply[%s][%d]\n", szReplyQueuename, pThreadInfo->sPublishMQ.bMQChannelOpen);
//sleep(4);
lane_log("MessageSender 6\n");
#if 1 // 백그라운드 전송 시, 언제나 최신을 처리하기 위한 기존 데이터 삭제(TEST 필)
//lane_log("Back Purge 0[%d][%s][%.2s]\n", type, nowTbl, procData+12);
if(type == 3)
{
memset(in_fare, 0x00, sizeof(in_fare));
memset(in_booth, 0x00, sizeof(in_booth));
memset(tmp_chg, 0x00, sizeof(tmp_chg));
memcpy(in_fare, procData, 2);
memcpy(in_booth, procData+2, 2);
memcpy(tmp_chg, procData+12, 2);
for(ii = 0; ii < 20; ii++)
{
if(strncmp(lane_info[ii].plz_id, in_fare, 2) == 0 && lane_info[ii].lane_no == lb_data2atoi(in_booth, 2))
break;
}
}
lane_log("MessageSender 7\n");
//memcpy(nowTbl, lane_info[ii].tbl_order, sizeof(nowTbl));
if(type == 3 && lane_info[ii].tbl_order == lb_data2atoi(tmp_chg, 2))
{
lane_log("Back Purge 1[%x]\n", pThreadInfo->sPublishMQ.sMQConnectionState);
pQueuePurgeResult = amqp_queue_purge(pThreadInfo->sPublishMQ.sMQConnectionState, 1, amqp_cstring_bytes(g_szMQMessageSendHqctQueuename));
lane_log("Back Purge 2[%x]\n", pThreadInfo->sPublishMQ.sMQConnectionState);
//sleep(1);
}else
lane_info[ii].tbl_order = lb_data2atoi(tmp_chg, 2);
#endif
lane_log("MessageSender 8\n");
nResult = PutMessageToQueue(pThreadInfo->sPublishMQ.sMQConnectionState, &pThreadInfo->sPublishMQ.bMQConnection, &pThreadInfo->sPublishMQ.bMQChannelOpen, pThreadInfo->nThreadIndex,
g_szMQMessageSendExchangername, szReplyQueuename, szTopic, &sMessageInfo, sMessageInfo.paMessageData, sMessageInfo.nMessageLength,
szSendUUIDString, sizeof(szSendUUIDString), false, NULL);
//lane_log("Put After\n");
//sleep(2);
lane_log("SendData[%d] : [Thread:%02d]%s\n", nResult, pThreadInfo->nThreadIndex, sMessageInfo.paMessageData);
if (nResult != MS_ERR_SUCCESS)
{
lane_log("[Thread:%02d]MQ Data Publish Error[Data:%s]!", pThreadInfo->nThreadIndex, sMessageInfo.paMessageData);
// 메시지 전송 오류인 경우 처리
// 오류인 경우 오류 로그에 추가하는 형태의 샘플이지만 DB 처리라면 재전송 여부를 결정해서 재전송 몇번 이내면 재전송 하고 그렇지 않으면 실패 처리하는 로직이 필요하다.
WriteLog(LogType_StandardOut, LogLevel_Info, "[Thread:%02d]MQ Data Publish Error[Data:%s]!", pThreadInfo->nThreadIndex, sMessageInfo.paMessageData);
}
else
{
//snprintf((char *)tbl_order, 2+1, "%.2s", szData+12);
lane_log("[Thread:%02d]MQ Data Publish[Data:%s]!", pThreadInfo->nThreadIndex, sMessageInfo.paMessageData);
bMessageMatch = false;
bTimeout = false;
#if 1 //2019.06.28
if(type == 0)
{
lane_log("Send type[%d] End Loot\n", type);
CloseMQ(pThreadInfo->nThreadIndex, pThreadInfo->sPublishMQ.sMQConnectionState, &pThreadInfo->sPublishMQ.bMQConnection, &pThreadInfo->sPublishMQ.bMQChannelOpen, &pThreadInfo->sPublishMQ.sMQMutex);
CloseMQ(pThreadInfo->nThreadIndex, pThreadInfo->sConsumeMQ.sMQConnectionState, &pThreadInfo->sConsumeMQ.bMQConnection, &pThreadInfo->sConsumeMQ.bMQChannelOpen, &pThreadInfo->sConsumeMQ.sMQMutex);
//sleep(2);
lane_log("Send type[%d] End\n", type);
pThreadInfo->nRunState = SOCK_THREAD_STATE_TERMINATED;
pthread_mutex_destroy((char *)&pThreadInfo->sContextMutex);
pthread_mutex_destroy((char *)&pThreadInfo->sPublishMQ.sMQMutex);
pthread_mutex_destroy((char *)&pThreadInfo->sConsumeMQ.sMQMutex);
lane_log("Mutex destroy[%d]\n", type);
//sleep(2);
SafeFree((void **)&pThreadInfo);
SafeFree((void **)&sMessageInfo.paMessageData);
SafeFree((void **)&pnThreadExitCode);
lane_log("SafeFree[%d]\n", type);
//sleep(2);
return(nResult);
}
#endif
if (g_bBypassMode)
{
do
{
GetCurrTime(&tRecvNow, &nRecvMillisecond);
nDiffTime = TimeDiff(&tNow, nMillisecond, &tRecvNow, nRecvMillisecond);
nDiffSec = nDiffTime / 1000;
nDiffMilliSec = nDiffTime % 1000;
if (nDiffMilliSec < 0)
{
nDiffSec--;
nDiffMilliSec += 1000;
}
sResponseTimeout.tv_sec = g_nResponseTimeout / 1000 - nDiffSec;
sResponseTimeout.tv_usec = (g_nResponseTimeout % 1000 - nDiffMilliSec) * 1000;
if (sResponseTimeout.tv_usec < 0)
{
sResponseTimeout.tv_sec--;
sResponseTimeout.tv_usec += 1000000;
}
if (sResponseTimeout.tv_sec < 0 || sResponseTimeout.tv_usec < 0)
{
WriteLog(LogType_StandardOut, LogLevel_Info, "[Thread:%02d]Response Timeout! - UUID:%s, Message:%s, ProcessTime:%d.%03ds\n", pThreadInfo->nThreadIndex, szSendUUIDString,
sMessageInfo.paMessageData, nDiffSec, nDiffMilliSec);
bTimeout = true;
break;
}
memset(rtnData, 0x00, sizeof(rtnData));
memset(procData, 0x00, sizeof(procData));
nResult = GetMessageFromQueue(pThreadInfo->sConsumeMQ.sMQConnectionState, szExchangername, sizeof(szExchangername), szMessageBuffer, sizeof(szMessageBuffer),
szRecvUUIDString, sizeof(szRecvUUIDString), szReplyQueuename, sizeof(szReplyQueuename), &pThreadInfo->sConsumeMQ.bMQConnection,
&pThreadInfo->sConsumeMQ.bMQChannelOpen, &nDeliveryTag, &sResponseTimeout);
//lane_log("Get Msg[%s][%d]\n", szMessageBuffer, nResult);
snprintf(rtnData, strnlen_s(szMessageBuffer+45, sizeof(szMessageBuffer)-45)+1, "%s", szMessageBuffer+45);
//lane_log("Rtn Data[%s]\n", rtnData);
GetCurrTime(&tRecvNow, &nRecvMillisecond);
nDiffTime = TimeDiff(&tNow, nMillisecond, &tRecvNow, nRecvMillisecond);
nDiffSec = nDiffTime / 1000;
nDiffMilliSec = nDiffTime % 1000;
if (nDiffMilliSec < 0)
{
nDiffSec--;
nDiffMilliSec += 1000;
}
if (nResult == MQ_ERR_TIMEOUT)
{
//printf("Response Timeout! - UUID:%s, Message:%s, ProcessTime:%d.%03ds\n", );
WriteLog(LogType_StandardOut, LogLevel_Info, "[Thread:%02d]Response Timeout! - UUID:%s, Message:%s, ProcessTime:%d.%03ds\n",
pThreadInfo->nThreadIndex, szSendUUIDString, sMessageInfo.paMessageData, nDiffSec, nDiffMilliSec);
bTimeout = true;
break;
}
//printf("TEST11[%d][%d][%s]\n", type, nResult, rtnData);
nResult = SafeStrCmp(szRecvUUIDString, sizeof(szRecvUUIDString), szSendUUIDString, &nCompareResult);
if (nCompareResult != 0)
{
WriteLog(LogType_StandardOut, LogLevel_Info, "[Thread:%02d]Recv UUID is not matched! - UUID:[Send:%s, Recv:%s], Message:%s, ProcessTime:%d.%03ds\n", pThreadInfo->nThreadIndex, szSendUUIDString, szRecvUUIDString, sMessageInfo.paMessageData, nDiffSec, nDiffMilliSec);
amqp_basic_ack(pThreadInfo->sConsumeMQ.sMQConnectionState, 1, nDeliveryTag, 0);
continue;
}
bMessageMatch = true;
nMessageLength = strnlen_s(szMessageBuffer, sizeof(szMessageBuffer));
amqp_basic_ack(pThreadInfo->sConsumeMQ.sMQConnectionState, 1, nDeliveryTag, 0);
WriteQueueLog(LogType_MQOut, LogLevel_Info, pThreadInfo->nThreadIndex, 'C', szReplyQueuename, "", szRecvUUIDString, nMessageLength, "%s", szMessageBuffer);
WriteLog(LogType_StandardOut, LogLevel_Info, "[Thread:%02d]MQ Data Consume[Data:%s] - ProcessTime:%d.%03ds!", pThreadInfo->nThreadIndex, szMessageBuffer, nDiffSec, nDiffMilliSec);
} while (bMessageMatch == false && bTimeout == false);
if (!bTimeout)
{
//printf("센터 전송 옴[%d]?\n", type);
// 전송 완료 처리 필요
}else
{
memset(rtnData, 0x00, sizeof(rtnData));
snprintf(rtnData, sizeof(rtnData), "%02x0002", cmd);
}
}
}
//sleep(1); // test
/*
}
*/
CloseMQ(pThreadInfo->nThreadIndex, pThreadInfo->sPublishMQ.sMQConnectionState, &pThreadInfo->sPublishMQ.bMQConnection, &pThreadInfo->sPublishMQ.bMQChannelOpen, &pThreadInfo->sPublishMQ.sMQMutex);
CloseMQ(pThreadInfo->nThreadIndex, pThreadInfo->sConsumeMQ.sMQConnectionState, &pThreadInfo->sConsumeMQ.bMQConnection, &pThreadInfo->sConsumeMQ.bMQChannelOpen, &pThreadInfo->sConsumeMQ.sMQMutex);
pThreadInfo->nRunState = SOCK_THREAD_STATE_TERMINATED;
#if 1 //2019.07.12 메모리 증가로 Mutex Unlock 후 삭제 부분 추가
pthread_mutex_destroy(&pThreadInfo->sContextMutex);
pthread_mutex_destroy(&pThreadInfo->sPublishMQ.sMQMutex);
pthread_mutex_destroy(&pThreadInfo->sConsumeMQ.sMQMutex);
#endif
SafeFree((void **)&pThreadInfo);
SafeFree((void **)&sMessageInfo.paMessageData);
SafeFree((void **)&pnThreadExitCode);
return 0;
}
int GetSerialNo()
{
int nSerialNo;
pthread_mutex_lock(&g_sSerialMutex);
nSerialNo = g_nSerailNo++;
pthread_mutex_unlock(&g_sSerialMutex);
return nSerialNo;
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save