public ByteBuffer allocate(int size, long maxTimeToBlockMs)throws InterruptedException { if (size > this.totalMemory) thrownew IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations.");
ByteBuffer buffer = null; this.lock.lock(); try { // check if we have a free buffer of the right size pooled // 如果申请的是一个标准的ByteBuffer,并且free队列里面有缓存的,直接取出并返回 if (size == poolableSize && !this.free.isEmpty()) returnthis.free.pollFirst();
// now check if the request is immediately satisfiable with the // memory on hand or if we need to block // 空闲队列的总字节数大小 = 队列长度 * 单个缓冲区大小 int freeListSize = freeSize() * this.poolableSize; // 未被使用的空间大小 + 空闲队列大小 >= 要申请的size if (this.nonPooledAvailableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer // 确保现有的空间足够分配 freeUp(size); // 并没有影响nonPooledAvailableMemory的语义 this.nonPooledAvailableMemory -= size; } else { // 内存不够用了,就要阻塞 // we are out of memory and will have to block
// 记录已分配的内存,size有可能需要多次分配 int accumulated = 0; // 获取竞态条件 Condition moreMemory = this.lock.newCondition(); try { // 阻塞时间 long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); // 竞争队列,添加到了最末尾,说明这是公平锁,先来的请求先获取内存 this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { // 本次获取缓冲区的开始时间 long startWaitNs = time.nanoseconds(); long timeNs; // 等待是否超时 boolean waitingTimeElapsed; try { // 等待时,会在此停止运行 waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } finally { long endWaitNs = time.nanoseconds(); // 记录阻塞时,等待的时间 timeNs = Math.max(0L, endWaitNs - startWaitNs); // 收集指标 this.waitTime.record(timeNs, time.milliseconds()); }
// 等待超时 if (waitingTimeElapsed) { thrownew TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); }
// 分两种情况, // 1. size=batch.size, 从free里获取,结束 // 2. size > 或 < batch.size 都会判断下一次循环条件,来确定是否退出 // check if we can satisfy this request from the free list, // otherwise allocate memory if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // just grab a buffer from the free list // 从free队列里取出 buffer = this.free.pollFirst(); accumulated = size; } else { // size > poolableSize 或者 size < poolableSize // we'll need to allocate memory, but we may only get // part of what we need on this iteration // 确保有足够的内存 freeUp(size - accumulated); // accumulated=0,要分配的size为100,现有的nonPooledAvailableMemory只有80 // 就只能先分配80,然后在下一轮循环里等待 int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory); this.nonPooledAvailableMemory -= got; // accumulated += got; } } // 这里很有意思, 这在下面的finally里表示分配过程中 是否有分配失败的内存 // Don't reclaim memory on throwable since nothing was thrown accumulated = 0; } finally { // When this loop was not able to successfully terminate don't loose available memory // 分配成功就是0,分配失败就不是0了,这里是在回收分配失败的内存 this.nonPooledAvailableMemory += accumulated; // 从队列里移除竞态条件 this.waiters.remove(moreMemory); } } } finally { // signal any additional waiters if there is more memory left // over for them try { // 如果有空闲的空间,并且还有在等待的内存分配申请,就唤醒等待队列(第一个) if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise find bugs complains // 释放锁 lock.unlock(); } }