001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.util;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.Map;
022import java.util.concurrent.TimeUnit;
023import java.util.concurrent.atomic.AtomicBoolean;
024
025import org.apache.activemq.broker.BrokerService;
026import org.apache.activemq.broker.SuppressReplyException;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.Queue;
029import org.apache.activemq.broker.region.RegionBroker;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * @org.apache.xbean.XBean
036 */
037 public class DefaultIOExceptionHandler implements IOExceptionHandler {
038
039    private static final Logger LOG = LoggerFactory
040            .getLogger(DefaultIOExceptionHandler.class);
041    protected BrokerService broker;
042    private boolean ignoreAllErrors = false;
043    private boolean ignoreNoSpaceErrors = true;
044    private boolean ignoreSQLExceptions = true;
045    private boolean stopStartConnectors = false;
046    private String noSpaceMessage = "space";
047    private String sqlExceptionMessage = ""; // match all
048    private long resumeCheckSleepPeriod = 5*1000;
049    private final AtomicBoolean handlingException = new AtomicBoolean(false);
050    private boolean systemExitOnShutdown = false;
051
052    @Override
053    public void handle(IOException exception) {
054        if (ignoreAllErrors) {
055            LOG.info("Ignoring IO exception, " + exception, exception);
056            return;
057        }
058
059        if (ignoreNoSpaceErrors) {
060            Throwable cause = exception;
061            while (cause != null && cause instanceof IOException) {
062                String message = cause.getMessage();
063                if (message != null && message.contains(noSpaceMessage)) {
064                    LOG.info("Ignoring no space left exception, " + exception, exception);
065                    return;
066                }
067                cause = cause.getCause();
068            }
069        }
070
071        if (ignoreSQLExceptions) {
072            Throwable cause = exception;
073            while (cause != null) {
074                if (cause instanceof SQLException) {
075                    String message = cause.getMessage();
076
077                    if (message == null) {
078                        message = "";
079                    }
080
081                    if (message.contains(sqlExceptionMessage)) {
082                        LOG.info("Ignoring SQLException, " + exception, cause);
083                        return;
084                    }
085                }
086                cause = cause.getCause();
087            }
088        }
089
090        if (stopStartConnectors) {
091            if (handlingException.compareAndSet(false, true)) {
092                LOG.info("Initiating stop/restart of transports on " + broker + " due to IO exception, " + exception, exception);
093
094                new Thread("IOExceptionHandler: stop transports") {
095                    @Override
096                    public void run() {
097                        try {
098                            ServiceStopper stopper = new ServiceStopper();
099                            broker.stopAllConnectors(stopper);
100                            LOG.info("Successfully stopped transports on " + broker);
101                        } catch (Exception e) {
102                            LOG.warn("Failure occurred while stopping broker connectors", e);
103                        } finally {
104                            // resume again
105                            new Thread("IOExceptionHandler: restart transports") {
106                                @Override
107                                public void run() {
108                                    try {
109                                        while (hasLockOwnership() && isPersistenceAdapterDown()) {
110                                            LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
111                                            TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
112                                        }
113                                        if (hasLockOwnership()) {
114                                            Map<ActiveMQDestination, Destination> destinations = ((RegionBroker)broker.getRegionBroker()).getDestinationMap();
115                                            for (Destination destination : destinations.values()) {
116
117                                                if (destination instanceof Queue) {
118                                                    Queue queue = (Queue)destination;
119                                                    if (queue.isResetNeeded()) {
120                                                        queue.clearPendingMessages();
121                                                    }
122                                                }
123                                            }
124                                            broker.startAllConnectors();
125                                            LOG.info("Successfully restarted transports on " + broker);
126                                        }
127                                    } catch (Exception e) {
128                                        LOG.warn("Stopping " + broker + " due to failure restarting transports", e);
129                                        stopBroker(e);
130                                    } finally {
131                                        handlingException.compareAndSet(true, false);
132                                    }
133                                }
134
135                                private boolean isPersistenceAdapterDown() {
136                                    boolean checkpointSuccess = false;
137                                    try {
138                                        broker.getPersistenceAdapter().checkpoint(true);
139                                        checkpointSuccess = true;
140                                    } catch (Throwable ignored) {
141                                    }
142                                    return !checkpointSuccess;
143                                }
144                            }.start();
145
146
147                        }
148                    }
149                }.start();
150            }
151
152            throw new SuppressReplyException("Stop/RestartTransportsInitiated", exception);
153        }
154
155        if (handlingException.compareAndSet(false, true)) {
156            stopBroker(exception);
157        }
158
159        // we don't want to propagate the exception back to the client
160        // They will see a delay till they see a disconnect via socket.close
161        // at which point failover: can kick in.
162        throw new SuppressReplyException("ShutdownBrokerInitiated", exception);
163    }
164
165    private void stopBroker(Exception exception) {
166        LOG.info("Stopping " + broker + " due to exception, " + exception, exception);
167        new Thread("IOExceptionHandler: stopping " + broker) {
168            @Override
169            public void run() {
170                try {
171                    if( broker.isRestartAllowed() ) {
172                        broker.requestRestart();
173                    }
174                    broker.setSystemExitOnShutdown(isSystemExitOnShutdown());
175                    broker.stop();
176                } catch (Exception e) {
177                    LOG.warn("Failure occurred while stopping broker", e);
178                }
179            }
180        }.start();
181    }
182
183    protected boolean hasLockOwnership() throws IOException {
184        return true;
185    }
186
187    @Override
188    public void setBrokerService(BrokerService broker) {
189        this.broker = broker;
190    }
191
192    public boolean isIgnoreAllErrors() {
193        return ignoreAllErrors;
194    }
195
196    public void setIgnoreAllErrors(boolean ignoreAllErrors) {
197        this.ignoreAllErrors = ignoreAllErrors;
198    }
199
200    public boolean isIgnoreNoSpaceErrors() {
201        return ignoreNoSpaceErrors;
202    }
203
204    public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
205        this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
206    }
207
208    public String getNoSpaceMessage() {
209        return noSpaceMessage;
210    }
211
212    public void setNoSpaceMessage(String noSpaceMessage) {
213        this.noSpaceMessage = noSpaceMessage;
214    }
215
216    public boolean isIgnoreSQLExceptions() {
217        return ignoreSQLExceptions;
218    }
219
220    public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
221        this.ignoreSQLExceptions = ignoreSQLExceptions;
222    }
223
224    public String getSqlExceptionMessage() {
225        return sqlExceptionMessage;
226    }
227
228    public void setSqlExceptionMessage(String sqlExceptionMessage) {
229        this.sqlExceptionMessage = sqlExceptionMessage;
230    }
231
232    public boolean isStopStartConnectors() {
233        return stopStartConnectors;
234    }
235
236    public void setStopStartConnectors(boolean stopStartConnectors) {
237        this.stopStartConnectors = stopStartConnectors;
238    }
239
240    public long getResumeCheckSleepPeriod() {
241        return resumeCheckSleepPeriod;
242    }
243
244    public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
245        this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
246    }
247
248    public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
249        this.systemExitOnShutdown = systemExitOnShutdown;
250    }
251
252    public boolean isSystemExitOnShutdown() {
253        return systemExitOnShutdown;
254    }
255}