#pragma once #include "JDefines.h" #include #include #include #include #include #include #include #include "UsbCmdWriter.h" #include "UsbCmdListener.h" #include "ByteCommand.h" #include "JTerminal.h" namespace Incart::Usb { class ByteCommandQueue: public QObject { Q_OBJECT private: const uint m_cmdParcerMaximumFrameSize = 100; const uint32_t m_uidCounterMax = 100000; // after this number uid counter = 0 UsbCmdWriter* const m_cmdWriter; UsbCmdListener* const m_cmdListener; std::queue> m_commands; std::shared_ptr m_executedCommand = nullptr; QMutex m_commandMutex; QTimer m_commandTimeoutTimer; std::atomic m_isDeviceConnected; uint32_t m_uidCounter = 0; QMutex m_uidMutex; public: ByteCommandQueue(UsbCmdWriter* const cmdWriter, UsbCmdListener* const cmdListener, QObject* parent = nullptr) : QObject(parent) , m_cmdWriter(cmdWriter) , m_cmdListener(cmdListener) , m_isDeviceConnected(false) { // НЕЛЬЗЯ ИСПОЛЬЗОВАТЬ emit m_cmdListener->signalAnswer внутри кода ByteCommandQueue - может привести к deadlock connect(m_cmdListener, &UsbCmdListener::signalAnswer, this, &ByteCommandQueue::handleCommandAnswerThreadSafe, Qt::QueuedConnection); connect(&m_commandTimeoutTimer, SIGNAL(timeout()), this, SLOT(handleCommandTimeout())); m_commandTimeoutTimer.setSingleShot(true); m_commandTimeoutTimer.stop(); } ~ByteCommandQueue() { m_commandTimeoutTimer.stop(); } signals: void signalCommandError(std::string commandName, std::vector sendBytes, std::vector answer, int errorCode); public: uint32_t generateUid() { QMutexLocker locker(&m_uidMutex); m_uidCounter++; if (m_uidCounter > m_uidCounterMax) { m_uidCounter = 0; } return m_uidCounter; } // [Thread: not only owned ByteCommandQueue] void enqueue(std::shared_ptr command) { QMutexLocker locker(&m_commandMutex); m_commands.push(command); executeNextCommand(); } // [Thread: not only owned ByteCommandQueue] void handleDisconnect() { QMutexLocker locker(&m_commandMutex); closeExecutedCommand(DeviceCommand::EStatus::DEVICE_DISCONNECTED); m_isDeviceConnected.store(false); } // [Thread: not only owned ByteCommandQueue] void setIsDeviceConnected(bool status) { m_isDeviceConnected.store(status); } private slots: // [Thread: not only owned ByteCommandQueue] void handleCommandAnswerThreadSafe(std::vector answer, int32_t status) { m_commandMutex.lock(); m_commandTimeoutTimer.stop(); auto lastCommand = m_executedCommand; m_executedCommand = nullptr; // разблокируем mutex, так как внутри слота AnswerIsReady может быть добавлена новая команда в очередь m_commandMutex.unlock(); if (status != DeviceCommand::EStatus::OK) { emit signalCommandError(lastCommand->getName(), lastCommand->getBytes(), answer, status); } emit lastCommand->answerIsReady(lastCommand, answer, status); m_commandMutex.lock(); if (m_commands.size() > 0) { executeNextCommand(); } m_commandMutex.unlock(); } // [Thread: owned ByteCommandQueue] void handleCommandTimeout() { Common::JTerminal() << __PRETTY_FUNCTION__ << "****" << '\n'; handleCommandAnswer(std::vector(), DeviceCommand::EStatus::TIMEOUT); } private: // [Thread: owned ByteCommandQueue] void closeExecutedCommand(DeviceCommand::EStatus status) { m_commandTimeoutTimer.stop(); if (m_executedCommand != nullptr) { auto lastCommand = m_executedCommand; m_executedCommand = nullptr; emit lastCommand->answerIsReady(lastCommand, std::vector(), status); } } // [Thread: owned ByteCommandQueue] void handleCommandAnswer(std::vector answer, int32_t status) { m_commandTimeoutTimer.stop(); auto lastCommand = m_executedCommand; m_executedCommand = nullptr; if (status != DeviceCommand::EStatus::OK) { emit signalCommandError(lastCommand->getName(), lastCommand->getBytes(), answer, status); } emit lastCommand->answerIsReady(lastCommand, answer, status); if (m_commands.size() > 0) { executeNextCommand(); } } // [Thread: owned ByteCommandQueue] void executeNextCommand() { if ((m_commands.size() == 0) || (m_executedCommand != nullptr)) { return; } m_executedCommand = m_commands.front(); m_commands.pop(); if (m_isDeviceConnected.load()) { std::vector commandBytes = m_executedCommand->getBytes(); sendCommand(std::move(commandBytes), m_executedCommand->getTimeoutInterval()); } else { handleCommandAnswer(std::vector(), DeviceCommand::EStatus::DEVICE_DISCONNECTED); } } // [Thread: owned ByteCommandQueue] void sendCommand(std::vector&& data, int timeoutInterval) { // посылка команды // за счет подачи команды через signal Qt::QueuedConnection образуется очередь команд // при внешних задержках основного потока передача коменд их очереди может быть неравномерна // команды могут приходить парой!!! // поэтому необходимо организовать цикл ожидания и допустить отказ от исполнения команды // для этого мы изображаем TimeOut if (timeoutInterval < 0) { return; } int i = 0; for (; (i < timeoutInterval) && m_cmdWriter->getIsTransferred(); i++) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } if (i == timeoutInterval) { Common::JTerminal() << __PRETTY_FUNCTION__ << " Can't send command - transfer is busy!" << '\n'; handleCommandAnswer(std::vector(), DeviceCommand::EStatus::TIMEOUT); } else { m_cmdWriter->setData(std::move(data)); m_commandTimeoutTimer.setInterval(timeoutInterval); m_commandTimeoutTimer.start(); } } }; } // namespace Incart::Usb