当前位置:编程学习 > html/css >>

文件队列 QueueFile

/** * Copyright (C) 2010 Square, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.squareup.util; import com.squareup.Square; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.NoSuchElementException; /** * A reliable, efficient, file-based, FIFO queue. Additions and removals are * O(1). All operations are atomic. Writes are synchronous; data will be * written to disk before an operation returns. The underlying file is * structured to survive process and even system crashes. If an I/O exception * is thrown during a mutating change, the change is aborted. It is safe to * continue to use a {@code QueueFile} instance after an exception. * * <p>All operations are synchronized. In a traditional queue, the remove * operation returns an element. In this queue, {@link #peek} and {@link * #remove} are used in conjunction. Use {@code peek} to retrieve the first * element, and then {@code remove} to remove it after successful processing. * If the system crashes after {@code peek} and during processing, the element * will remain in the queue, to be processed when the system restarts. * * <p><b><font color="red">NOTE:</font></b> The current implementation is * built for file systems that support atomic segment writes (like YAFFS). * Most conventional file systems don't support this; if the power goes out * while writing a segment, the segment will contain garbage and the file will * be corrupt. We'll add journaling support so this class can be used with * more file systems later. * * @author Bob Lee (bob@squareup.com) */ public class QueueFile { /** Initial file size in bytes. */ private static final int INITIAL_LENGTH = 4096; // one file system block /** Length of header in bytes. */ static final int HEADER_LENGTH = 16; /** * The underlying file. Uses a ring buffer to store entries. Designed so * that a modification isn't committed or visible until we write the header. * The header is much smaller than a segment. So long as the underlying file * system supports atomic segment writes, changes to the queue are atomic. * Storing the file length ensures we can recover from a failed expansion * (i.e. if setting the file length succeeds but the process dies before the * data can be copied). * * <pre> * Format: * Header (16 bytes) * Element Ring Buffer (File Length - 16 bytes) * * Header: * File Length (4 bytes) * Element Count (4 bytes) * First Element Position (4 bytes, =0 if null) * Last Element Position (4 bytes, =0 if null) * * Element: * Length (4 bytes) * Data (Length bytes) * </pre> */ private final RandomAccessFile raf; /** Cached file length. Always a power of 2. */ int fileLength; /** Number of elements. */ private int elementCount; /** Pointer to first (or eldest) element. */ private Element first; /** Pointer to last (or newest) element. */ private Element last; /** In-memory buffer. Big enough to hold the header. */ private final byte[] buffer = new byte[16]; /** * Constructs a new queue backed by the given file. Only one {@code QueueFile} * instance should access a given file at a time. */ public QueueFile(File file) throws IOException { if (!file.exists()) initialize(file); raf = open(file); readHeader(); } /** For testing. */ QueueFile(RandomAccessFile raf) throws IOException { this.raf = raf; readHeader(); } /** * Stores int in buffer. The behavior is equivalent to calling * {@link RandomAccessFile#writeInt}. */ private static void writeInt(byte[] buffer, int offset, int value) { buffer[offset] = (byte) (value >> 24); buffer[offset + 1] = (byte) (value >> 16); buffer[offset + 2] = (byte) (value >> 8); buffer[offset + 3] = (byte) value; } /** * Stores int values in buffer. The behavior is equivalent to calling * {@link RandomAccessFile#writeInt} for each value. */ private static void writeInts(byte[] buffer, int... values) { int offset = 0; for (int value : values) { writeInt(buffer, offset, value); offset += 4; } } /** * Reads an int from a byte[]. */ private static int readInt(byte[] buffer, int offset) { return ((buffer[offset] & 0xff) << 24) + ((buffer[offset + 1] & 0xff) << 16) + ((buffer[offset + 2] & 0xff) << 8) + (buffer[offset + 3] & 0xff); } /** * Reads the header. */ private void readHeader() throws IOException { raf.seek(0); raf.readFully(buffer); fileLength = readInt(buffer, 0); elementCount = readInt(buffer, 4); int firstOffset = readInt(buffer, 8); int lastOffset = readInt(buffer, 12); first = readElement(firstOffset); last = readElement(lastOffset); } /** * Writes header atomically. The arguments contain the updated values. The * class member fields should not have changed yet. This only updates the * state in the file. It's up to the caller to update the class member * variables *after* this call succeeds. Assumes segment writes are atomic * in the underlying file system. */ private void writeHeader(int fileLength, int elementCount, int firstPosition, int lastPosition) throws IOException { writeInts(buffer, fileLength, elementCount, firstPosition, lastPosition); raf.seek(0); raf.write(buffer); } /** * Returns the Element for the given offset. */ private Element readElement(int position) throws IOException { if (position == 0) return Element.NULL; raf.seek(position); return new Element(position, raf.readInt()); } /** Atomically initializes a new file. */ private static void initialize(File file) throws IOException { // Use a temp file so we don't leave a partially-initialized file. File tempFile = new File(file.getPath() + ".tmp"); RandomAccessFile raf = open(tempFile); try { raf.setLength(INITIAL_LENGTH); raf.seek(0); byte[] headerBuffer = new byte[16]; writeInts(headerBuffer, INITIAL_LENGTH, 0, 0, 0); raf.write(headerBuffer); } finally { raf.close(); } // A rename is atomic. if (!tempFile.renameTo(file)) throw new IOException("Rename failed!"); } /** * Opens a random access file that writes synchronously. */ private static RandomAccessFile open(File file) throws FileNotFoundException { return new RandomAccessFile(file, "rwd"); } /** * Wraps the position if it exceeds the end of the file. */ private int wrapPosition(int position) { return position < fileLength ? position : HEADER_LENGTH + position - fileLength; } /** * Writes count bytes from buffer to position in file. Automatically wraps * write if position is past the end of the file or if buffer overlaps it. * * @param position in file to write to * @param buffer to write from * @param count # of bytes to write */ private void ringWrite(int position, byte[] buffer, int offset, int count) throws IOException { position = wrapPosition(position); if (position + count <= fileLength) { raf.seek(position); raf.write(buffer, offset, count); } else { // The write overlaps the EOF. // # of bytes to write before the EOF. int beforeEof = fileLength - position; raf.seek(position); raf.write(buffer, offset, beforeEof); raf.seek(HEADER_LENGTH); raf.write(buffer, offset + beforeEof, count - beforeEof); } } /** * Reads count bytes into buffer from file. Wraps if n
补充:web前端 , HTML/CSS  ,
CopyRight © 2012 站长网 编程知识问答 www.zzzyk.com All Rights Reserved
部份技术文章来自网络,