001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.journal;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025
026/**
027 * Used to pool DataFileAccessors.
028 *
029 * @author chirino
030 */
031public class DataFileAccessorPool {
032
033    private final Journal journal;
034    private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
035    private boolean closed;
036    private int maxOpenReadersPerFile = 5;
037
038    class Pool {
039
040        private final DataFile file;
041        private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
042        private int openCounter;
043        private boolean disposed;
044
045        public Pool(DataFile file) {
046            this.file = file;
047        }
048
049        public DataFileAccessor openDataFileReader() throws IOException {
050            DataFileAccessor rc = null;
051            if (pool.isEmpty()) {
052                rc = new DataFileAccessor(journal, file);
053            } else {
054                rc = pool.remove(pool.size() - 1);
055            }
056            openCounter++;
057            return rc;
058        }
059
060        public synchronized void closeDataFileReader(DataFileAccessor reader) {
061            openCounter--;
062            if (pool.size() >= maxOpenReadersPerFile || disposed) {
063                reader.dispose();
064            } else {
065                pool.add(reader);
066            }
067        }
068
069        public synchronized boolean isUsed() {
070            return openCounter > 0;
071        }
072
073        public synchronized void dispose() {
074            for (DataFileAccessor reader : pool) {
075                reader.dispose();
076            }
077            pool.clear();
078            disposed = true;
079        }
080
081        public synchronized int getOpenCounter() {
082            return openCounter;
083        }
084
085    }
086
087    public DataFileAccessorPool(Journal dataManager) {
088        this.journal = dataManager;
089    }
090
091    public synchronized int size() {
092        return pools.size();
093    }
094
095    public synchronized void disposeUnused() {
096        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
097            Pool pool = iter.next();
098            if (!pool.isUsed()) {
099                pool.dispose();
100                iter.remove();
101            }
102        }
103    }
104
105    synchronized void disposeDataFileAccessors(DataFile dataFile) {
106        if (closed) {
107            throw new IllegalStateException("Closed.");
108        }
109        Pool pool = pools.get(dataFile.getDataFileId());
110        if (pool != null) {
111            if (pool.getOpenCounter() == 0) {
112                pool.dispose();
113                pools.remove(dataFile.getDataFileId());
114            } else {
115                throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter());
116            }
117        }
118    }
119
120    synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
121        if (closed) {
122            throw new IOException("Closed.");
123        }
124
125        Pool pool = pools.get(dataFile.getDataFileId());
126        if (pool == null) {
127            pool = new Pool(dataFile);
128            pools.put(dataFile.getDataFileId(), pool);
129        }
130        return pool.openDataFileReader();
131    }
132
133    synchronized void closeDataFileAccessor(DataFileAccessor reader) {
134        Pool pool = pools.get(reader.getDataFile().getDataFileId());
135        if (pool == null || closed) {
136            reader.dispose();
137        } else {
138            pool.closeDataFileReader(reader);
139        }
140    }
141
142    public synchronized void close() {
143        if (closed) {
144            return;
145        }
146        closed = true;
147        for (Pool pool : pools.values()) {
148            pool.dispose();
149        }
150        pools.clear();
151    }
152
153}