codememo

반복 가능한 크기의 덩어리를 일정한 크기로 분할하는 방법

tipmemo 2023. 7. 22. 10:06
반응형

반복 가능한 크기의 덩어리를 일정한 크기로 분할하는 방법

입력을 반복 가능하게 하고 반복 가능한 항목을 반환하는 "배치" 함수를 찾을 수 없다는 것이 놀랍습니다.

예:

for i in batch(range(0,10), 1): print i
[0]
[1]
...
[9]

또는:

for i in batch(range(0,10), 3): print i
[0,1,2]
[3,4,5]
[6,7,8]
[9]

자, 저는 제가 생각하기에 꽤 간단한 발전기를 썼습니다.

def batch(iterable, n = 1):
   current_batch = []
   for item in iterable:
       current_batch.append(item)
       if len(current_batch) == n:
           yield current_batch
           current_batch = []
   if current_batch:
       yield current_batch

그러나 위의 내용은 제가 기대했던 것을 제공하지 않습니다.

for x in   batch(range(0,10),3): print x
[0]
[0, 1]
[0, 1, 2]
[3]
[3, 4]
[3, 4, 5]
[6]
[6, 7]
[6, 7, 8]
[9]

그래서, 저는 무언가를 놓쳤고 이것은 아마도 파이썬 생성기에 대한 저의 완전한 이해 부족을 보여줍니다.누가 나에게 올바른 방향을 알려줄 수 있습니까?

[편집: 결국 위의 동작은 파이썬 자체가 아닌 ipython 내에서 실행할 때만 발생한다는 것을 깨달았습니다.]

이것이 아마도 더 효율적일 것입니다(더 빠름).

def batch(iterable, n=1):
    l = len(iterable)
    for ndx in range(0, l, n):
        yield iterable[ndx:min(ndx + n, l)]

for x in batch(range(0, 10), 3):
    print x

목록 사용 예제

data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # list of data 

for x in batch(data, 3):
    print(x)

# Output

[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9, 10]

새 목록을 작성하는 것을 방지합니다.

리터 도구 모듈의 레시피는 최종 홀수 크기 로트를 처리하는 방법에 따라 두 가지 방법을 제공합니다(유지, 채우기 값으로 채우기, 무시 또는 예외 발생).

from itertools import islice, zip_longest

def batched(iterable, n):
    "Batch data into lists of length n. The last batch may be shorter."
    # batched('ABCDEFG', 3) --> ABC DEF G
    it = iter(iterable)
    while True:
        batch = list(islice(it, n))
        if not batch:
            return
        yield batch

def grouper(iterable, n, *, incomplete='fill', fillvalue=None):
    "Collect data into non-overlapping fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
    # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
    # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
    args = [iter(iterable)] * n
    if incomplete == 'fill':
        return zip_longest(*args, fillvalue=fillvalue)
    if incomplete == 'strict':
        return zip(*args, strict=True)
    if incomplete == 'ignore':
        return zip(*args)
    else:
        raise ValueError('Expected fill, strict, or ignore')

More-iter 도구에는 필요한 작업을 수행하는 두 가지 기능이 포함되어 있습니다.

  • chunked(iterable, n) 각 길이의 목록 반복 가능한 값을 반환합니다.n(더 짧을 수 있는 마지막 것은 제외);
  • ichunked(iterable, n) 유사하지만 반복 가능한 항목을 대신 반환합니다.

다른 사람들이 지적했듯이, 당신이 준 코드는 당신이 원하는 것을 정확히 수행합니다.다음을 사용한 다른 접근 방식itertools.islice다음과 같은 레시피의 를 볼 수 있습니다.

from itertools import islice, chain

def batch(iterable, size):
    sourceiter = iter(iterable)
    while True:
        batchiter = islice(sourceiter, size)
        yield chain([batchiter.next()], batchiter)

Python 3.8을 위한 솔루션(정의되지 않은 반복 가능성으로 작업하는 경우)len기능, 그리고 지치게 됩니다.

from itertools import islice

def batcher(iterable, batch_size):
    iterator = iter(iterable)
    while batch := list(islice(iterator, batch_size)):
        yield batch

사용 예:

def my_gen():
    yield from range(10)
 
for batch in batcher(my_gen(), 3):
    print(batch)

>>> [0, 1, 2]
>>> [3, 4, 5]
>>> [6, 7, 8]
>>> [9]

물론 바다코끼리 운영자 없이도 구현될 수 있습니다.

사용하지 않는 매우 짧은 코드 조각입니다.lenPython 2와 3 모두에서 작동합니다(내가 만든 것이 아님).

def chunks(iterable, size):
    from itertools import chain, islice
    iterator = iter(iterable)
    for first in iterator:
        yield list(chain([first], islice(iterator, size - 1)))

이상하다, 파이썬 2.x에서 잘 작동하는 것 같습니다.

>>> def batch(iterable, n = 1):
...    current_batch = []
...    for item in iterable:
...        current_batch.append(item)
...        if len(current_batch) == n:
...            yield current_batch
...            current_batch = []
...    if current_batch:
...        yield current_batch
...
>>> for x in batch(range(0, 10), 3):
...     print x
...
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9]

@Atra Azami의 답변에서 수정된 Python 3.8의 새로운 기능이 없는 실행 가능한 버전.

import itertools    

def batch_generator(iterable, batch_size=1):
    iterable = iter(iterable)

    while True:
        batch = list(itertools.islice(iterable, batch_size))
        if len(batch) > 0:
            yield batch
        else:
            break

for x in batch_generator(range(0, 10), 3):
    print(x)

출력:

[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9]

나는 이것이 마음에 들어보세요.

def batch(x, bs):
    return [x[i:i+bs] for i in range(0, len(x), bs)]

가 크기 배치리반를다환니합스트▁of인 배치 합니다.bs 사용하여 생성기로 만들 수 .(i for i in iterable)물론이야.

def batch(iterable, n):
    iterable=iter(iterable)
    while True:
        chunk=[]
        for i in range(n):
            try:
                chunk.append(next(iterable))
            except StopIteration:
                yield chunk
                return
        yield chunk

list(batch(range(10), 3))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]

islice 및 inter(callable) 동작을 활용하여 가능한 한 많은 것을 CPython으로 이동:

from itertools import islice

def chunked(generator, size):
    """Read parts of the generator, pause each time after a chunk"""
    # islice returns results until 'size',
    # make_chunk gets repeatedly called by iter(callable).
    gen = iter(generator)
    make_chunk = lambda: list(islice(gen, size))
    return iter(make_chunk, [])

좀 더 다양한 도구에서 영감을 얻어 코드의 핵심으로 단축했습니다.

이것이 제 프로젝트에서 사용하는 것입니다.반복 가능한 한 효율적으로 목록을 처리합니다.

def chunker(iterable, size):
    if not hasattr(iterable, "__len__"):
        # generators don't have len, so fall back to slower
        # method that works with generators
        for chunk in chunker_gen(iterable, size):
            yield chunk
        return

    it = iter(iterable)
    for i in range(0, len(iterable), size):
        yield [k for k in islice(it, size)]


def chunker_gen(generator, size):
    iterator = iter(generator)
    for first in iterator:

        def chunk():
            yield first
            for more in islice(iterator, size - 1):
                yield more

        yield [k for k in chunk()]

다음은 다음과 같은 방법을 사용하는 방법입니다.reduce기능.

한 줄기:

from functools import reduce
reduce(lambda cumulator,item: cumulator[-1].append(item) or cumulator if len(cumulator[-1]) < batch_size else cumulator + [[item]], input_array, [[]])

또는 더 읽기 쉬운 버전:

from functools import reduce
def batch(input_list, batch_size):
  def reducer(cumulator, item):
    if len(cumulator[-1]) < batch_size:
      cumulator[-1].append(item)
      return cumulator
    else:
      cumulator.append([item])
    return cumulator
  return reduce(reducer, input_list, [[]])

테스트:

>>> batch([1,2,3,4,5,6,7], 3)
[[1, 2, 3], [4, 5, 6], [7]]
>>> batch(a, 8)
[[1, 2, 3, 4, 5, 6, 7]]
>>> batch([1,2,3,None,4], 3)
[[1, 2, 3], [None, 4]]

이것은 어떤 반복적인 것에도 효과가 있을 것입니다.

from itertools import zip_longest, filterfalse

def batch_iterable(iterable, batch_size=2): 
    args = [iter(iterable)] * batch_size 
    return (tuple(filterfalse(lambda x: x is None, group)) for group in zip_longest(fillvalue=None, *args))

다음과 같이 작동합니다.

>>>list(batch_iterable(range(0,5)), 2)
[(0, 1), (2, 3), (4,)]

PS: 편집 가능한 값이 없음이면 작동하지 않습니다.

일괄 인덱스를 기준으로 반복 가능한 항목을 그룹화할 수 있습니다.

def batch(items: Iterable, batch_size: int) -> Iterable[Iterable]:
    # enumerate items and group them by batch index
    enumerated_item_groups = itertools.groupby(enumerate(items), lambda t: t[0] // batch_size)
    # extract items from enumeration tuples
    item_batches = ((t[1] for t in enumerated_items) for key, enumerated_items in enumerated_item_groups)
    return item_batches

내부 반복 가능한 항목을 수집하려는 경우가 많기 때문에 고급 버전을 소개합니다.

def batch_advanced(items: Iterable, batch_size: int, batches_mapper: Callable[[Iterable], Any] = None) -> Iterable[Iterable]:
    enumerated_item_groups = itertools.groupby(enumerate(items), lambda t: t[0] // batch_size)
    if batches_mapper:
        item_batches = (batches_mapper(t[1] for t in enumerated_items) for key, enumerated_items in enumerated_item_groups)
    else:
        item_batches = ((t[1] for t in enumerated_items) for key, enumerated_items in enumerated_item_groups)
    return item_batches

예:

print(list(batch_advanced([1, 9, 3, 5, 2, 4, 2], 4, tuple)))
# [(1, 9, 3, 5), (2, 4, 2)]
print(list(batch_advanced([1, 9, 3, 5, 2, 4, 2], 4, list)))
# [[1, 9, 3, 5], [2, 4, 2]]

필요할 수 있는 관련 기능:

def batch(size, i):
    """ Get the i'th batch of the given size """
    return slice(size* i, size* i + size)

용도:

>>> [1,2,3,4,5,6,7,8,9,10][batch(3, 1)]
>>> [4, 5, 6]

판다 프레임( dataframes)과 데이터 할 수 .df.iloc[batch(100,0)]배열()은 numpy 배열()입니다.array[batch(100,0)]).

from itertools import *

class SENTINEL: pass

def batch(iterable, n):
    return (tuple(filterfalse(lambda x: x is SENTINEL, group)) for group in zip_longest(fillvalue=SENTINEL, *[iter(iterable)] * n))

print(list(range(10), 3)))
# outputs: [(0, 1, 2), (3, 4, 5), (6, 7, 8), (9,)]
print(list(batch([None]*10, 3)))
# outputs: [(None, None, None), (None, None, None), (None, None, None), (None,)]

사용합니다

def batchify(arr, batch_size):
  num_batches = math.ceil(len(arr) / batch_size)
  return [arr[i*batch_size:(i+1)*batch_size] for i in range(num_batches)]
  

소진될 때까지 (최대) n개의 요소를 계속 사용합니다.

def chop(n, iterable):
    iterator = iter(iterable)
    while chunk := list(take(n, iterator)):
        yield chunk


def take(n, iterable):
    iterator = iter(iterable)
    for i in range(n):
        try:
            yield next(iterator)
        except StopIteration:
            return

이 코드에는 다음과 같은 기능이 있습니다.

  • 리스트 또는 제너레이터(no len())를 입력으로 사용할 수 있습니다.
  • 다른 패키지를 가져올 필요가 없습니다.
  • 마지막 배치에 패딩이 추가되지 않았습니다.
def batch_generator(items, batch_size):
    itemid=0 # Keeps track of current position in items generator/list
    batch = [] # Empty batch
    for item in items: 
      batch.append(item) # Append items to batch
      if len(batch)==batch_size:
        yield batch
        itemid += batch_size # Increment the position in items
        batch = []
    yield batch # yield last bit

언급URL : https://stackoverflow.com/questions/8290397/how-to-split-an-iterable-in-constant-size-chunks

반응형