文件队列 QueueFile
/**
* Copyright (C) 2010 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.util;
import com.squareup.Square;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.NoSuchElementException;
/**
* A reliable, efficient, file-based, FIFO queue. Additions and removals are
* O(1). All operations are atomic. Writes are synchronous; data will be
* written to disk before an operation returns. The underlying file is
* structured to survive process and even system crashes. If an I/O exception
* is thrown during a mutating change, the change is aborted. It is safe to
* continue to use a {@code QueueFile} instance after an exception.
*
* <p>All operations are synchronized. In a traditional queue, the remove
* operation returns an element. In this queue, {@link #peek} and {@link
* #remove} are used in conjunction. Use {@code peek} to retrieve the first
* element, and then {@code remove} to remove it after successful processing.
* If the system crashes after {@code peek} and during processing, the element
* will remain in the queue, to be processed when the system restarts.
*
* <p><b><font color="red">NOTE:</font></b> The current implementation is
* built for file systems that support atomic segment writes (like YAFFS).
* Most conventional file systems don't support this; if the power goes out
* while writing a segment, the segment will contain garbage and the file will
* be corrupt. We'll add journaling support so this class can be used with
* more file systems later.
*
* @author Bob Lee (bob@squareup.com)
*/
public class QueueFile {
/** Initial file size in bytes. */
private static final int INITIAL_LENGTH = 4096; // one file system block
/** Length of header in bytes. */
static final int HEADER_LENGTH = 16;
/**
* The underlying file. Uses a ring buffer to store entries. Designed so
* that a modification isn't committed or visible until we write the header.
* The header is much smaller than a segment. So long as the underlying file
* system supports atomic segment writes, changes to the queue are atomic.
* Storing the file length ensures we can recover from a failed expansion
* (i.e. if setting the file length succeeds but the process dies before the
* data can be copied).
*
* <pre>
* Format:
* Header (16 bytes)
* Element Ring Buffer (File Length - 16 bytes)
*
* Header:
* File Length (4 bytes)
* Element Count (4 bytes)
* First Element Position (4 bytes, =0 if null)
* Last Element Position (4 bytes, =0 if null)
*
* Element:
* Length (4 bytes)
* Data (Length bytes)
* </pre>
*/
private final RandomAccessFile raf;
/** Cached file length. Always a power of 2. */
int fileLength;
/** Number of elements. */
private int elementCount;
/** Pointer to first (or eldest) element. */
private Element first;
/** Pointer to last (or newest) element. */
private Element last;
/** In-memory buffer. Big enough to hold the header. */
private final byte[] buffer = new byte[16];
/**
* Constructs a new queue backed by the given file. Only one {@code QueueFile}
* instance should access a given file at a time.
*/
public QueueFile(File file) throws IOException {
if (!file.exists()) initialize(file);
raf = open(file);
readHeader();
}
/** For testing. */
QueueFile(RandomAccessFile raf) throws IOException {
this.raf = raf;
readHeader();
}
/**
* Stores int in buffer. The behavior is equivalent to calling
* {@link RandomAccessFile#writeInt}.
*/
private static void writeInt(byte[] buffer, int offset, int value) {
buffer[offset] = (byte) (value >> 24);
buffer[offset + 1] = (byte) (value >> 16);
buffer[offset + 2] = (byte) (value >> 8);
buffer[offset + 3] = (byte) value;
}
/**
* Stores int values in buffer. The behavior is equivalent to calling
* {@link RandomAccessFile#writeInt} for each value.
*/
private static void writeInts(byte[] buffer, int... values) {
int offset = 0;
for (int value : values) {
writeInt(buffer, offset, value);
offset += 4;
}
}
/**
* Reads an int from a byte[].
*/
private static int readInt(byte[] buffer, int offset) {
return ((buffer[offset] & 0xff) << 24)
+ ((buffer[offset + 1] & 0xff) << 16)
+ ((buffer[offset + 2] & 0xff) << 8)
+ (buffer[offset + 3] & 0xff);
}
/**
* Reads the header.
*/
private void readHeader() throws IOException {
raf.seek(0);
raf.readFully(buffer);
fileLength = readInt(buffer, 0);
elementCount = readInt(buffer, 4);
int firstOffset = readInt(buffer, 8);
int lastOffset = readInt(buffer, 12);
first = readElement(firstOffset);
last = readElement(lastOffset);
}
/**
* Writes header atomically. The arguments contain the updated values. The
* class member fields should not have changed yet. This only updates the
* state in the file. It's up to the caller to update the class member
* variables *after* this call succeeds. Assumes segment writes are atomic
* in the underlying file system.
*/
private void writeHeader(int fileLength, int elementCount, int firstPosition,
int lastPosition) throws IOException {
writeInts(buffer, fileLength, elementCount, firstPosition, lastPosition);
raf.seek(0);
raf.write(buffer);
}
/**
* Returns the Element for the given offset.
*/
private Element readElement(int position) throws IOException {
if (position == 0) return Element.NULL;
raf.seek(position);
return new Element(position, raf.readInt());
}
/** Atomically initializes a new file. */
private static void initialize(File file) throws IOException {
// Use a temp file so we don't leave a partially-initialized file.
File tempFile = new File(file.getPath() + ".tmp");
RandomAccessFile raf = open(tempFile);
try {
raf.setLength(INITIAL_LENGTH);
raf.seek(0);
byte[] headerBuffer = new byte[16];
writeInts(headerBuffer, INITIAL_LENGTH, 0, 0, 0);
raf.write(headerBuffer);
} finally {
raf.close();
}
// A rename is atomic.
if (!tempFile.renameTo(file)) throw new IOException("Rename failed!");
}
/**
* Opens a random access file that writes synchronously.
*/
private static RandomAccessFile open(File file) throws FileNotFoundException {
return new RandomAccessFile(file, "rwd");
}
/**
* Wraps the position if it exceeds the end of the file.
*/
private int wrapPosition(int position) {
return position < fileLength ? position
: HEADER_LENGTH + position - fileLength;
}
/**
* Writes count bytes from buffer to position in file. Automatically wraps
* write if position is past the end of the file or if buffer overlaps it.
*
* @param position in file to write to
* @param buffer to write from
* @param count # of bytes to write
*/
private void ringWrite(int position, byte[] buffer, int offset, int count)
throws IOException {
position = wrapPosition(position);
if (position + count <= fileLength) {
raf.seek(position);
raf.write(buffer, offset, count);
} else {
// The write overlaps the EOF.
// # of bytes to write before the EOF.
int beforeEof = fileLength - position;
raf.seek(position);
raf.write(buffer, offset, beforeEof);
raf.seek(HEADER_LENGTH);
raf.write(buffer, offset + beforeEof, count - beforeEof);
}
}
/**
* Reads count bytes into buffer from file. Wraps if n
补充:web前端 , HTML/CSS ,