001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.util; 018 019import java.io.IOException; 020import java.sql.SQLException; 021import java.util.Map; 022import java.util.concurrent.TimeUnit; 023import java.util.concurrent.atomic.AtomicBoolean; 024 025import org.apache.activemq.broker.BrokerService; 026import org.apache.activemq.broker.SuppressReplyException; 027import org.apache.activemq.broker.region.Destination; 028import org.apache.activemq.broker.region.Queue; 029import org.apache.activemq.broker.region.RegionBroker; 030import org.apache.activemq.command.ActiveMQDestination; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * @org.apache.xbean.XBean 036 */ 037 public class DefaultIOExceptionHandler implements IOExceptionHandler { 038 039 private static final Logger LOG = LoggerFactory 040 .getLogger(DefaultIOExceptionHandler.class); 041 protected BrokerService broker; 042 private boolean ignoreAllErrors = false; 043 private boolean ignoreNoSpaceErrors = true; 044 private boolean ignoreSQLExceptions = true; 045 private boolean stopStartConnectors = false; 046 private String noSpaceMessage = "space"; 047 private String sqlExceptionMessage = ""; // match all 048 private long resumeCheckSleepPeriod = 5*1000; 049 private final AtomicBoolean handlingException = new AtomicBoolean(false); 050 private boolean systemExitOnShutdown = false; 051 052 @Override 053 public void handle(IOException exception) { 054 if (ignoreAllErrors) { 055 LOG.info("Ignoring IO exception, " + exception, exception); 056 return; 057 } 058 059 if (ignoreNoSpaceErrors) { 060 Throwable cause = exception; 061 while (cause != null && cause instanceof IOException) { 062 String message = cause.getMessage(); 063 if (message != null && message.contains(noSpaceMessage)) { 064 LOG.info("Ignoring no space left exception, " + exception, exception); 065 return; 066 } 067 cause = cause.getCause(); 068 } 069 } 070 071 if (ignoreSQLExceptions) { 072 Throwable cause = exception; 073 while (cause != null) { 074 if (cause instanceof SQLException) { 075 String message = cause.getMessage(); 076 077 if (message == null) { 078 message = ""; 079 } 080 081 if (message.contains(sqlExceptionMessage)) { 082 LOG.info("Ignoring SQLException, " + exception, cause); 083 return; 084 } 085 } 086 cause = cause.getCause(); 087 } 088 } 089 090 if (stopStartConnectors) { 091 if (handlingException.compareAndSet(false, true)) { 092 LOG.info("Initiating stop/restart of transports on " + broker + " due to IO exception, " + exception, exception); 093 094 new Thread("IOExceptionHandler: stop transports") { 095 @Override 096 public void run() { 097 try { 098 ServiceStopper stopper = new ServiceStopper(); 099 broker.stopAllConnectors(stopper); 100 LOG.info("Successfully stopped transports on " + broker); 101 } catch (Exception e) { 102 LOG.warn("Failure occurred while stopping broker connectors", e); 103 } finally { 104 // resume again 105 new Thread("IOExceptionHandler: restart transports") { 106 @Override 107 public void run() { 108 try { 109 while (hasLockOwnership() && isPersistenceAdapterDown()) { 110 LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports"); 111 TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod); 112 } 113 if (hasLockOwnership()) { 114 Map<ActiveMQDestination, Destination> destinations = ((RegionBroker)broker.getRegionBroker()).getDestinationMap(); 115 for (Destination destination : destinations.values()) { 116 117 if (destination instanceof Queue) { 118 Queue queue = (Queue)destination; 119 if (queue.isResetNeeded()) { 120 queue.clearPendingMessages(); 121 } 122 } 123 } 124 broker.startAllConnectors(); 125 LOG.info("Successfully restarted transports on " + broker); 126 } 127 } catch (Exception e) { 128 LOG.warn("Stopping " + broker + " due to failure restarting transports", e); 129 stopBroker(e); 130 } finally { 131 handlingException.compareAndSet(true, false); 132 } 133 } 134 135 private boolean isPersistenceAdapterDown() { 136 boolean checkpointSuccess = false; 137 try { 138 broker.getPersistenceAdapter().checkpoint(true); 139 checkpointSuccess = true; 140 } catch (Throwable ignored) { 141 } 142 return !checkpointSuccess; 143 } 144 }.start(); 145 146 147 } 148 } 149 }.start(); 150 } 151 152 throw new SuppressReplyException("Stop/RestartTransportsInitiated", exception); 153 } 154 155 if (handlingException.compareAndSet(false, true)) { 156 stopBroker(exception); 157 } 158 159 // we don't want to propagate the exception back to the client 160 // They will see a delay till they see a disconnect via socket.close 161 // at which point failover: can kick in. 162 throw new SuppressReplyException("ShutdownBrokerInitiated", exception); 163 } 164 165 private void stopBroker(Exception exception) { 166 LOG.info("Stopping " + broker + " due to exception, " + exception, exception); 167 new Thread("IOExceptionHandler: stopping " + broker) { 168 @Override 169 public void run() { 170 try { 171 if( broker.isRestartAllowed() ) { 172 broker.requestRestart(); 173 } 174 broker.setSystemExitOnShutdown(isSystemExitOnShutdown()); 175 broker.stop(); 176 } catch (Exception e) { 177 LOG.warn("Failure occurred while stopping broker", e); 178 } 179 } 180 }.start(); 181 } 182 183 protected boolean hasLockOwnership() throws IOException { 184 return true; 185 } 186 187 @Override 188 public void setBrokerService(BrokerService broker) { 189 this.broker = broker; 190 } 191 192 public boolean isIgnoreAllErrors() { 193 return ignoreAllErrors; 194 } 195 196 public void setIgnoreAllErrors(boolean ignoreAllErrors) { 197 this.ignoreAllErrors = ignoreAllErrors; 198 } 199 200 public boolean isIgnoreNoSpaceErrors() { 201 return ignoreNoSpaceErrors; 202 } 203 204 public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) { 205 this.ignoreNoSpaceErrors = ignoreNoSpaceErrors; 206 } 207 208 public String getNoSpaceMessage() { 209 return noSpaceMessage; 210 } 211 212 public void setNoSpaceMessage(String noSpaceMessage) { 213 this.noSpaceMessage = noSpaceMessage; 214 } 215 216 public boolean isIgnoreSQLExceptions() { 217 return ignoreSQLExceptions; 218 } 219 220 public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) { 221 this.ignoreSQLExceptions = ignoreSQLExceptions; 222 } 223 224 public String getSqlExceptionMessage() { 225 return sqlExceptionMessage; 226 } 227 228 public void setSqlExceptionMessage(String sqlExceptionMessage) { 229 this.sqlExceptionMessage = sqlExceptionMessage; 230 } 231 232 public boolean isStopStartConnectors() { 233 return stopStartConnectors; 234 } 235 236 public void setStopStartConnectors(boolean stopStartConnectors) { 237 this.stopStartConnectors = stopStartConnectors; 238 } 239 240 public long getResumeCheckSleepPeriod() { 241 return resumeCheckSleepPeriod; 242 } 243 244 public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) { 245 this.resumeCheckSleepPeriod = resumeCheckSleepPeriod; 246 } 247 248 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) { 249 this.systemExitOnShutdown = systemExitOnShutdown; 250 } 251 252 public boolean isSystemExitOnShutdown() { 253 return systemExitOnShutdown; 254 } 255}