1 /* 2 * Copyright (C) 2014 Square, Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package okio; 17 18 import java.io.EOFException; 19 import java.io.IOException; 20 import java.io.InputStream; 21 import java.io.OutputStream; 22 import java.security.MessageDigest; 23 import java.security.NoSuchAlgorithmException; 24 import java.util.ArrayList; 25 import java.util.Collections; 26 import java.util.List; 27 28 import static okio.Util.UTF_8; 29 import static okio.Util.checkOffsetAndCount; 30 import static okio.Util.reverseBytesLong; 31 32 /** 33 * A collection of bytes in memory. 34 * 35 * <p><strong>Moving data from one OkBuffer to another is fast.</strong> Instead 36 * of copying bytes from one place in memory to another, this class just changes 37 * ownership of the underlying byte arrays. 38 * 39 * <p><strong>This buffer grows with your data.</strong> Just like ArrayList, 40 * each OkBuffer starts small. It consumes only the memory it needs to. 41 * 42 * <p><strong>This buffer pools its byte arrays.</strong> When you allocate a 43 * byte array in Java, the runtime must zero-fill the requested array before 44 * returning it to you. Even if you're going to write over that space anyway. 45 * This class avoids zero-fill and GC churn by pooling byte arrays. 46 */ 47 public final class OkBuffer implements BufferedSource, BufferedSink, Cloneable { 48 Segment head; 49 long size; 50 51 public OkBuffer() { 52 } 53 54 /** Returns the number of bytes currently in this buffer. */ 55 public long size() { 56 return size; 57 } 58 59 @Override public OkBuffer buffer() { 60 return this; 61 } 62 63 @Override public OutputStream outputStream() { 64 return new OutputStream() { 65 @Override public void write(int b) { 66 writeByte((byte) b); 67 } 68 69 @Override public void write(byte[] data, int offset, int byteCount) { 70 OkBuffer.this.write(data, offset, byteCount); 71 } 72 73 @Override public void flush() { 74 } 75 76 @Override public void close() { 77 } 78 79 @Override public String toString() { 80 return this + ".outputStream()"; 81 } 82 }; 83 } 84 85 @Override public OkBuffer emitCompleteSegments() { 86 return this; // Nowhere to emit to! 87 } 88 89 @Override public boolean exhausted() { 90 return size == 0; 91 } 92 93 @Override public void require(long byteCount) throws EOFException { 94 if (this.size < byteCount) throw new EOFException(); 95 } 96 97 @Override public InputStream inputStream() { 98 return new InputStream() { 99 @Override public int read() { 100 return readByte() & 0xff; 101 } 102 103 @Override public int read(byte[] sink, int offset, int byteCount) { 104 return OkBuffer.this.read(sink, offset, byteCount); 105 } 106 107 @Override public int available() { 108 return (int) Math.min(size, Integer.MAX_VALUE); 109 } 110 111 @Override public void close() { 112 } 113 114 @Override public String toString() { 115 return OkBuffer.this + ".inputStream()"; 116 } 117 }; 118 } 119 120 /** 121 * Returns the number of bytes in segments that are not writable. This is the 122 * number of bytes that can be flushed immediately to an underlying sink 123 * without harming throughput. 124 */ 125 public long completeSegmentByteCount() { 126 long result = size; 127 if (result == 0) return 0; 128 129 // Omit the tail if it's still writable. 130 Segment tail = head.prev; 131 if (tail.limit < Segment.SIZE) { 132 result -= tail.limit - tail.pos; 133 } 134 135 return result; 136 } 137 138 @Override public byte readByte() { 139 if (size == 0) throw new IllegalStateException("size == 0"); 140 141 Segment segment = head; 142 int pos = segment.pos; 143 int limit = segment.limit; 144 145 byte[] data = segment.data; 146 byte b = data[pos++]; 147 size -= 1; 148 149 if (pos == limit) { 150 head = segment.pop(); 151 SegmentPool.INSTANCE.recycle(segment); 152 } else { 153 segment.pos = pos; 154 } 155 156 return b; 157 } 158 159 /** Returns the byte at {@code pos}. */ 160 public byte getByte(long pos) { 161 checkOffsetAndCount(size, pos, 1); 162 for (Segment s = head; true; s = s.next) { 163 int segmentByteCount = s.limit - s.pos; 164 if (pos < segmentByteCount) return s.data[s.pos + (int) pos]; 165 pos -= segmentByteCount; 166 } 167 } 168 169 @Override public short readShort() { 170 if (size < 2) throw new IllegalStateException("size < 2: " + size); 171 172 Segment segment = head; 173 int pos = segment.pos; 174 int limit = segment.limit; 175 176 // If the short is split across multiple segments, delegate to readByte(). 177 if (limit - pos < 2) { 178 int s = (readByte() & 0xff) << 8 179 | (readByte() & 0xff); 180 return (short) s; 181 } 182 183 byte[] data = segment.data; 184 int s = (data[pos++] & 0xff) << 8 185 | (data[pos++] & 0xff); 186 size -= 2; 187 188 if (pos == limit) { 189 head = segment.pop(); 190 SegmentPool.INSTANCE.recycle(segment); 191 } else { 192 segment.pos = pos; 193 } 194 195 return (short) s; 196 } 197 198 @Override public int readInt() { 199 if (size < 4) throw new IllegalStateException("size < 4: " + size); 200 201 Segment segment = head; 202 int pos = segment.pos; 203 int limit = segment.limit; 204 205 // If the int is split across multiple segments, delegate to readByte(). 206 if (limit - pos < 4) { 207 return (readByte() & 0xff) << 24 208 | (readByte() & 0xff) << 16 209 | (readByte() & 0xff) << 8 210 | (readByte() & 0xff); 211 } 212 213 byte[] data = segment.data; 214 int i = (data[pos++] & 0xff) << 24 215 | (data[pos++] & 0xff) << 16 216 | (data[pos++] & 0xff) << 8 217 | (data[pos++] & 0xff); 218 size -= 4; 219 220 if (pos == limit) { 221 head = segment.pop(); 222 SegmentPool.INSTANCE.recycle(segment); 223 } else { 224 segment.pos = pos; 225 } 226 227 return i; 228 } 229 230 @Override public long readLong() { 231 if (size < 8) throw new IllegalStateException("size < 8: " + size); 232 233 Segment segment = head; 234 int pos = segment.pos; 235 int limit = segment.limit; 236 237 // If the long is split across multiple segments, delegate to readInt(). 238 if (limit - pos < 8) { 239 return (readInt() & 0xffffffffL) << 32 240 | (readInt() & 0xffffffffL); 241 } 242 243 byte[] data = segment.data; 244 long v = (data[pos++] & 0xffL) << 56 245 | (data[pos++] & 0xffL) << 48 246 | (data[pos++] & 0xffL) << 40 247 | (data[pos++] & 0xffL) << 32 248 | (data[pos++] & 0xffL) << 24 249 | (data[pos++] & 0xffL) << 16 250 | (data[pos++] & 0xffL) << 8 251 | (data[pos++] & 0xffL); 252 size -= 8; 253 254 if (pos == limit) { 255 head = segment.pop(); 256 SegmentPool.INSTANCE.recycle(segment); 257 } else { 258 segment.pos = pos; 259 } 260 261 return v; 262 } 263 264 @Override public short readShortLe() { 265 return Util.reverseBytesShort(readShort()); 266 } 267 268 @Override public int readIntLe() { 269 return Util.reverseBytesInt(readInt()); 270 } 271 272 @Override public long readLongLe() { 273 return Util.reverseBytesLong(readLong()); 274 } 275 276 @Override public ByteString readByteString(long byteCount) { 277 return new ByteString(readBytes(byteCount)); 278 } 279 280 @Override public String readUtf8(long byteCount) { 281 checkOffsetAndCount(this.size, 0, byteCount); 282 if (byteCount > Integer.MAX_VALUE) { 283 throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount); 284 } 285 if (byteCount == 0) return ""; 286 287 Segment head = this.head; 288 if (head.pos + byteCount > head.limit) { 289 // If the string spans multiple segments, delegate to readBytes(). 290 return new String(readBytes(byteCount), Util.UTF_8); 291 } 292 293 String result = new String(head.data, head.pos, (int) byteCount, UTF_8); 294 head.pos += byteCount; 295 this.size -= byteCount; 296 297 if (head.pos == head.limit) { 298 this.head = head.pop(); 299 SegmentPool.INSTANCE.recycle(head); 300 } 301 302 return result; 303 } 304 305 @Override public String readUtf8Line() throws IOException { 306 long newline = indexOf((byte) '\n'); 307 308 if (newline == -1) { 309 return size != 0 ? readUtf8(size) : null; 310 } 311 312 return readUtf8Line(newline); 313 } 314 315 @Override public String readUtf8LineStrict() throws IOException { 316 long newline = indexOf((byte) '\n'); 317 if (newline == -1) throw new EOFException(); 318 return readUtf8Line(newline); 319 } 320 321 String readUtf8Line(long newline) { 322 if (newline > 0 && getByte(newline - 1) == '\r') { 323 // Read everything until '\r\n', then skip the '\r\n'. 324 String result = readUtf8((newline - 1)); 325 skip(2); 326 return result; 327 328 } else { 329 // Read everything until '\n', then skip the '\n'. 330 String result = readUtf8(newline); 331 skip(1); 332 return result; 333 } 334 } 335 336 private byte[] readBytes(long byteCount) { 337 checkOffsetAndCount(this.size, 0, byteCount); 338 if (byteCount > Integer.MAX_VALUE) { 339 throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount); 340 } 341 342 int offset = 0; 343 byte[] result = new byte[(int) byteCount]; 344 345 while (offset < byteCount) { 346 int toCopy = (int) Math.min(byteCount - offset, head.limit - head.pos); 347 System.arraycopy(head.data, head.pos, result, offset, toCopy); 348 349 offset += toCopy; 350 head.pos += toCopy; 351 352 if (head.pos == head.limit) { 353 Segment toRecycle = head; 354 head = toRecycle.pop(); 355 SegmentPool.INSTANCE.recycle(toRecycle); 356 } 357 } 358 359 this.size -= byteCount; 360 return result; 361 } 362 363 /** Like {@link InputStream#read}. */ 364 int read(byte[] sink, int offset, int byteCount) { 365 Segment s = this.head; 366 if (s == null) return -1; 367 int toCopy = Math.min(byteCount, s.limit - s.pos); 368 System.arraycopy(s.data, s.pos, sink, offset, toCopy); 369 370 s.pos += toCopy; 371 this.size -= toCopy; 372 373 if (s.pos == s.limit) { 374 this.head = s.pop(); 375 SegmentPool.INSTANCE.recycle(s); 376 } 377 378 return toCopy; 379 } 380 381 /** 382 * Discards all bytes in this buffer. Calling this method when you're done 383 * with a buffer will return its segments to the pool. 384 */ 385 public void clear() { 386 skip(size); 387 } 388 389 /** Discards {@code byteCount} bytes from the head of this buffer. */ 390 @Override public void skip(long byteCount) { 391 checkOffsetAndCount(this.size, 0, byteCount); 392 393 this.size -= byteCount; 394 while (byteCount > 0) { 395 int toSkip = (int) Math.min(byteCount, head.limit - head.pos); 396 byteCount -= toSkip; 397 head.pos += toSkip; 398 399 if (head.pos == head.limit) { 400 Segment toRecycle = head; 401 head = toRecycle.pop(); 402 SegmentPool.INSTANCE.recycle(toRecycle); 403 } 404 } 405 } 406 407 @Override public OkBuffer write(ByteString byteString) { 408 return write(byteString.data, 0, byteString.data.length); 409 } 410 411 @Override public OkBuffer writeUtf8(String string) { 412 // TODO: inline UTF-8 encoding to save allocating a byte[]? 413 byte[] data = string.getBytes(Util.UTF_8); 414 return write(data, 0, data.length); 415 } 416 417 @Override public OkBuffer write(byte[] source) { 418 return write(source, 0, source.length); 419 } 420 421 @Override public OkBuffer write(byte[] source, int offset, int byteCount) { 422 int limit = offset + byteCount; 423 while (offset < limit) { 424 Segment tail = writableSegment(1); 425 426 int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit); 427 System.arraycopy(source, offset, tail.data, tail.limit, toCopy); 428 429 offset += toCopy; 430 tail.limit += toCopy; 431 } 432 433 this.size += byteCount; 434 return this; 435 } 436 437 @Override public OkBuffer writeByte(int b) { 438 Segment tail = writableSegment(1); 439 tail.data[tail.limit++] = (byte) b; 440 size += 1; 441 return this; 442 } 443 444 @Override public OkBuffer writeShort(int s) { 445 Segment tail = writableSegment(2); 446 byte[] data = tail.data; 447 int limit = tail.limit; 448 data[limit++] = (byte) ((s >>> 8) & 0xff); 449 data[limit++] = (byte) (s & 0xff); 450 tail.limit = limit; 451 size += 2; 452 return this; 453 } 454 455 @Override public BufferedSink writeShortLe(int s) { 456 return writeShort(Util.reverseBytesShort((short) s)); 457 } 458 459 @Override public OkBuffer writeInt(int i) { 460 Segment tail = writableSegment(4); 461 byte[] data = tail.data; 462 int limit = tail.limit; 463 data[limit++] = (byte) ((i >>> 24) & 0xff); 464 data[limit++] = (byte) ((i >>> 16) & 0xff); 465 data[limit++] = (byte) ((i >>> 8) & 0xff); 466 data[limit++] = (byte) (i & 0xff); 467 tail.limit = limit; 468 size += 4; 469 return this; 470 } 471 472 @Override public BufferedSink writeIntLe(int i) { 473 return writeInt(Util.reverseBytesInt(i)); 474 } 475 476 @Override public OkBuffer writeLong(long v) { 477 Segment tail = writableSegment(8); 478 byte[] data = tail.data; 479 int limit = tail.limit; 480 data[limit++] = (byte) ((v >>> 56L) & 0xff); 481 data[limit++] = (byte) ((v >>> 48L) & 0xff); 482 data[limit++] = (byte) ((v >>> 40L) & 0xff); 483 data[limit++] = (byte) ((v >>> 32L) & 0xff); 484 data[limit++] = (byte) ((v >>> 24L) & 0xff); 485 data[limit++] = (byte) ((v >>> 16L) & 0xff); 486 data[limit++] = (byte) ((v >>> 8L) & 0xff); 487 data[limit++] = (byte) (v & 0xff); 488 tail.limit = limit; 489 size += 8; 490 return this; 491 } 492 493 @Override public BufferedSink writeLongLe(long v) { 494 return writeLong(reverseBytesLong(v)); 495 } 496 497 /** 498 * Returns a tail segment that we can write at least {@code minimumCapacity} 499 * bytes to, creating it if necessary. 500 */ 501 Segment writableSegment(int minimumCapacity) { 502 if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException(); 503 504 if (head == null) { 505 head = SegmentPool.INSTANCE.take(); // Acquire a first segment. 506 return head.next = head.prev = head; 507 } 508 509 Segment tail = head.prev; 510 if (tail.limit + minimumCapacity > Segment.SIZE) { 511 tail = tail.push(SegmentPool.INSTANCE.take()); // Append a new empty segment to fill up. 512 } 513 return tail; 514 } 515 516 @Override public void write(OkBuffer source, long byteCount) { 517 // Move bytes from the head of the source buffer to the tail of this buffer 518 // while balancing two conflicting goals: don't waste CPU and don't waste 519 // memory. 520 // 521 // 522 // Don't waste CPU (ie. don't copy data around). 523 // 524 // Copying large amounts of data is expensive. Instead, we prefer to 525 // reassign entire segments from one OkBuffer to the other. 526 // 527 // 528 // Don't waste memory. 529 // 530 // As an invariant, adjacent pairs of segments in an OkBuffer should be at 531 // least 50% full, except for the head segment and the tail segment. 532 // 533 // The head segment cannot maintain the invariant because the application is 534 // consuming bytes from this segment, decreasing its level. 535 // 536 // The tail segment cannot maintain the invariant because the application is 537 // producing bytes, which may require new nearly-empty tail segments to be 538 // appended. 539 // 540 // 541 // Moving segments between buffers 542 // 543 // When writing one buffer to another, we prefer to reassign entire segments 544 // over copying bytes into their most compact form. Suppose we have a buffer 545 // with these segment levels [91%, 61%]. If we append a buffer with a 546 // single [72%] segment, that yields [91%, 61%, 72%]. No bytes are copied. 547 // 548 // Or suppose we have a buffer with these segment levels: [100%, 2%], and we 549 // want to append it to a buffer with these segment levels [99%, 3%]. This 550 // operation will yield the following segments: [100%, 2%, 99%, 3%]. That 551 // is, we do not spend time copying bytes around to achieve more efficient 552 // memory use like [100%, 100%, 4%]. 553 // 554 // When combining buffers, we will compact adjacent buffers when their 555 // combined level doesn't exceed 100%. For example, when we start with 556 // [100%, 40%] and append [30%, 80%], the result is [100%, 70%, 80%]. 557 // 558 // 559 // Splitting segments 560 // 561 // Occasionally we write only part of a source buffer to a sink buffer. For 562 // example, given a sink [51%, 91%], we may want to write the first 30% of 563 // a source [92%, 82%] to it. To simplify, we first transform the source to 564 // an equivalent buffer [30%, 62%, 82%] and then move the head segment, 565 // yielding sink [51%, 91%, 30%] and source [62%, 82%]. 566 567 if (source == this) { 568 throw new IllegalArgumentException("source == this"); 569 } 570 checkOffsetAndCount(source.size, 0, byteCount); 571 572 while (byteCount > 0) { 573 // Is a prefix of the source's head segment all that we need to move? 574 if (byteCount < (source.head.limit - source.head.pos)) { 575 Segment tail = head != null ? head.prev : null; 576 if (tail == null || byteCount + (tail.limit - tail.pos) > Segment.SIZE) { 577 // We're going to need another segment. Split the source's head 578 // segment in two, then move the first of those two to this buffer. 579 source.head = source.head.split((int) byteCount); 580 } else { 581 // Our existing segments are sufficient. Move bytes from source's head to our tail. 582 source.head.writeTo(tail, (int) byteCount); 583 source.size -= byteCount; 584 this.size += byteCount; 585 return; 586 } 587 } 588 589 // Remove the source's head segment and append it to our tail. 590 Segment segmentToMove = source.head; 591 long movedByteCount = segmentToMove.limit - segmentToMove.pos; 592 source.head = segmentToMove.pop(); 593 if (head == null) { 594 head = segmentToMove; 595 head.next = head.prev = head; 596 } else { 597 Segment tail = head.prev; 598 tail = tail.push(segmentToMove); 599 tail.compact(); 600 } 601 source.size -= movedByteCount; 602 this.size += movedByteCount; 603 byteCount -= movedByteCount; 604 } 605 } 606 607 @Override public long read(OkBuffer sink, long byteCount) { 608 if (this.size == 0) return -1L; 609 if (byteCount > this.size) byteCount = this.size; 610 sink.write(this, byteCount); 611 return byteCount; 612 } 613 614 @Override public OkBuffer deadline(Deadline deadline) { 615 // All operations are in memory so this class doesn't need to honor deadlines. 616 return this; 617 } 618 619 @Override public long indexOf(byte b) { 620 return indexOf(b, 0); 621 } 622 623 /** 624 * Returns the index of {@code b} in this at or beyond {@code fromIndex}, or 625 * -1 if this buffer does not contain {@code b} in that range. 626 */ 627 public long indexOf(byte b, long fromIndex) { 628 Segment s = head; 629 if (s == null) return -1L; 630 long offset = 0L; 631 do { 632 int segmentByteCount = s.limit - s.pos; 633 if (fromIndex > segmentByteCount) { 634 fromIndex -= segmentByteCount; 635 } else { 636 byte[] data = s.data; 637 for (long pos = s.pos + fromIndex, limit = s.limit; pos < limit; pos++) { 638 if (data[(int) pos] == b) return offset + pos - s.pos; 639 } 640 fromIndex = 0; 641 } 642 offset += segmentByteCount; 643 s = s.next; 644 } while (s != head); 645 return -1L; 646 } 647 648 @Override public void flush() { 649 } 650 651 @Override public void close() { 652 } 653 654 /** For testing. This returns the sizes of the segments in this buffer. */ 655 List<Integer> segmentSizes() { 656 if (head == null) return Collections.emptyList(); 657 List<Integer> result = new ArrayList<Integer>(); 658 result.add(head.limit - head.pos); 659 for (Segment s = head.next; s != head; s = s.next) { 660 result.add(s.limit - s.pos); 661 } 662 return result; 663 } 664 665 @Override public boolean equals(Object o) { 666 if (!(o instanceof OkBuffer)) return false; 667 OkBuffer that = (OkBuffer) o; 668 if (size != that.size) return false; 669 if (size == 0) return true; // Both buffers are empty. 670 671 Segment sa = this.head; 672 Segment sb = that.head; 673 int posA = sa.pos; 674 int posB = sb.pos; 675 676 for (long pos = 0, count; pos < size; pos += count) { 677 count = Math.min(sa.limit - posA, sb.limit - posB); 678 679 for (int i = 0; i < count; i++) { 680 if (sa.data[posA++] != sb.data[posB++]) return false; 681 } 682 683 if (posA == sa.limit) { 684 sa = sa.next; 685 posA = sa.pos; 686 } 687 688 if (posB == sb.limit) { 689 sb = sb.next; 690 posB = sb.pos; 691 } 692 } 693 694 return true; 695 } 696 697 @Override public int hashCode() { 698 Segment s = head; 699 if (s == null) return 0; 700 int result = 1; 701 do { 702 for (int pos = s.pos, limit = s.limit; pos < limit; pos++) { 703 result = 31 * result + s.data[pos]; 704 } 705 s = s.next; 706 } while (s != head); 707 return result; 708 } 709 710 @Override public String toString() { 711 if (size == 0) { 712 return "OkBuffer[size=0]"; 713 } 714 715 if (size <= 16) { 716 ByteString data = clone().readByteString(size); 717 return String.format("OkBuffer[size=%s data=%s]", size, data.hex()); 718 } 719 720 try { 721 MessageDigest md5 = MessageDigest.getInstance("MD5"); 722 md5.update(head.data, head.pos, head.limit - head.pos); 723 for (Segment s = head.next; s != head; s = s.next) { 724 md5.update(s.data, s.pos, s.limit - s.pos); 725 } 726 return String.format("OkBuffer[size=%s md5=%s]", 727 size, ByteString.of(md5.digest()).hex()); 728 } catch (NoSuchAlgorithmException e) { 729 throw new AssertionError(); 730 } 731 } 732 733 /** Returns a deep copy of this buffer. */ 734 @Override public OkBuffer clone() { 735 OkBuffer result = new OkBuffer(); 736 if (size() == 0) return result; 737 738 result.write(head.data, head.pos, head.limit - head.pos); 739 for (Segment s = head.next; s != head; s = s.next) { 740 result.write(s.data, s.pos, s.limit - s.pos); 741 } 742 743 return result; 744 } 745 } 746