cobra bots: handle stalled connection by disconnecting and reconnecting instead of quitting, and expecting kubernete to restart us
This commit is contained in:
		@@ -44,6 +44,7 @@ namespace ix
 | 
				
			|||||||
        std::atomic<bool> stop(false);
 | 
					        std::atomic<bool> stop(false);
 | 
				
			||||||
        std::atomic<bool> throttled(false);
 | 
					        std::atomic<bool> throttled(false);
 | 
				
			||||||
        std::atomic<bool> fatalCobraError(false);
 | 
					        std::atomic<bool> fatalCobraError(false);
 | 
				
			||||||
 | 
					        std::atomic<bool> stalledConnection(false);
 | 
				
			||||||
        int minuteCounter = 0;
 | 
					        int minuteCounter = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        auto timer = [&sentCount,
 | 
					        auto timer = [&sentCount,
 | 
				
			||||||
@@ -95,7 +96,13 @@ namespace ix
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        std::thread t1(timer);
 | 
					        std::thread t1(timer);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        auto heartbeat = [&sentCount, &receivedCount, &stop, &enableHeartbeat, &heartBeatTimeout, &fatalCobraError] {
 | 
					        auto heartbeat = [&sentCount,
 | 
				
			||||||
 | 
					                          &receivedCount,
 | 
				
			||||||
 | 
					                          &stop,
 | 
				
			||||||
 | 
					                          &enableHeartbeat,
 | 
				
			||||||
 | 
					                          &heartBeatTimeout,
 | 
				
			||||||
 | 
					                          &stalledConnection]
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
            setThreadName("Bot heartbeat");
 | 
					            setThreadName("Bot heartbeat");
 | 
				
			||||||
            std::string state("na");
 | 
					            std::string state("na");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -111,9 +118,12 @@ namespace ix
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
                if (currentState == state)
 | 
					                if (currentState == state)
 | 
				
			||||||
                {
 | 
					                {
 | 
				
			||||||
                    CoreLogger::error("no messages received or sent for 1 minute, exiting");
 | 
					                    ss.str("");
 | 
				
			||||||
                    fatalCobraError = true;
 | 
					                    ss << "no messages received or sent for "
 | 
				
			||||||
                    break;
 | 
					                       << heartBeatTimeout << " seconds, reconnecting";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    CoreLogger::error(ss.str());
 | 
				
			||||||
 | 
					                    stalledConnection = true;
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                state = currentState;
 | 
					                state = currentState;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -234,6 +244,13 @@ namespace ix
 | 
				
			|||||||
                std::this_thread::sleep_for(duration);
 | 
					                std::this_thread::sleep_for(duration);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if (fatalCobraError) break;
 | 
					                if (fatalCobraError) break;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if (stalledConnection)
 | 
				
			||||||
 | 
					                {
 | 
				
			||||||
 | 
					                    conn.disconnect();
 | 
				
			||||||
 | 
					                    conn.connect();
 | 
				
			||||||
 | 
					                    stalledConnection = false;
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        // Run for a duration, used by unittesting now
 | 
					        // Run for a duration, used by unittesting now
 | 
				
			||||||
@@ -245,6 +262,13 @@ namespace ix
 | 
				
			|||||||
                std::this_thread::sleep_for(duration);
 | 
					                std::this_thread::sleep_for(duration);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if (fatalCobraError) break;
 | 
					                if (fatalCobraError) break;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if (stalledConnection)
 | 
				
			||||||
 | 
					                {
 | 
				
			||||||
 | 
					                    conn.disconnect();
 | 
				
			||||||
 | 
					                    conn.connect();
 | 
				
			||||||
 | 
					                    stalledConnection = false;
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -123,7 +123,7 @@ namespace ix
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // periodically display all device ids
 | 
					        // periodically display all device ids
 | 
				
			||||||
        if (sentCount % 100 == 0)
 | 
					        if (sentCount % 1000 == 0)
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            ss.str(""); // reset the stringstream
 | 
					            ss.str(""); // reset the stringstream
 | 
				
			||||||
            ss << "## " << deviceIdCounters.size() << " unique device ids ##" << std::endl;
 | 
					            ss << "## " << deviceIdCounters.size() << " unique device ids ##" << std::endl;
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user