diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9c6d5e8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +# MacOS +# ===== +.DS_Store \ No newline at end of file diff --git a/README.md b/README.md index 7874947..a492227 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,12 @@ The offical code https://github.com/ChenyangSi/FreeU ## Usage + +### Image Pipelines + ```python -from diffusers import StableDiffusionPipeline import torch +from diffusers import StableDiffusionPipeline from .free_lunch_utils import register_free_upblock2d, register_free_crossattn_upblock2d model_id = "runwayml/stable-diffusion-v1-5" @@ -23,5 +26,32 @@ image = pipe(prompt).images[0] image.save("astronaut_rides_horse.png") ``` +### Video Pipelines + +```python +import torch +from diffusers import TextToVideoSDPipeline +from diffusers.utils import export_to_video +from .free_lunch_utils import register_free_upblock3d, register_free_crossattn_upblock3d + +model_id = "cerspense/zeroscope_v2_576w" +pipe = TextToVideoSDPipeline.from_pretrained(model_id, torch_dtype=torch.float16) +pipe = pipe.to("cuda") + +# -------- freeu block registration +register_free_upblock3d(pipe, b1=1.2, b2=1.4, s1=0.9, s2=0.2) +register_free_crossattn_upblock3d(pipe, b1=1.2, b2=1.4, s1=0.9, s2=0.2) +# -------- freeu block registration + +prompt = "an astronaut riding a horse on mars" +video_frames = pipe(prompt, height=320, width=576, num_frames=30).frames + +export_to_video(video_frames, "astronaut_rides_horse.mp4") +``` + +#### 28/09/23 +Current version was successfully ran on diffusers v0.21.2. + +#### 26/09/23 Note that it is supported and tested on diffusers v0.19.3. -If you are using the latest diffusers, it is recommended to use the corresponding branch, but it has not been tested. +If you are using the latest diffusers, it is recommended to use the corresponding branch, but it has not been tested. \ No newline at end of file diff --git a/__init__.py b/__init__.py index 3608dfa..f2bd7dd 100644 --- a/__init__.py +++ b/__init__.py @@ -1 +1,5 @@ -from .free_lunch_utils import register_upblock2d, register_free_upblock2d, register_crossattn_upblock2d, register_free_crossattn_upblock2d \ No newline at end of file +from .free_lunch_utils import ( + register_upblock2d, register_free_upblock2d, + register_crossattn_upblock2d, register_free_crossattn_upblock2d, + register_upblock3d, register_free_upblock3d +) \ No newline at end of file diff --git a/free_lunch_utils.py b/free_lunch_utils.py index 79d79cd..8763505 100644 --- a/free_lunch_utils.py +++ b/free_lunch_utils.py @@ -1,8 +1,10 @@ +from typing import Any, Dict, Optional, Tuple + import torch import torch.fft as fft -from diffusers.models.unet_2d_condition import logger from diffusers.utils import is_torch_version -from typing import Any, Dict, List, Optional, Tuple, Union +from diffusers.models.unet_2d_condition import logger as logger2d +from diffusers.models.unet_3d_condition import logger as logger3d def isinstance_str(x: object, cls_name: str): @@ -20,36 +22,57 @@ def isinstance_str(x: object, cls_name: str): return False -def Fourier_filter(x, threshold, scale): - dtype = x.dtype - x = x.type(torch.float32) +def Fourier_filter(x_in, threshold, scale): + """ + Updated Fourier filter based on: + https://github.com/huggingface/diffusers/pull/5164#issuecomment-1732638706 + """ + + x = x_in + B, C, H, W = x.shape + + # Non-power of 2 images must be float32 + if (W & (W - 1)) != 0 or (H & (H - 1)) != 0: + x = x.to(dtype=torch.float32) + # FFT x_freq = fft.fftn(x, dim=(-2, -1)) x_freq = fft.fftshift(x_freq, dim=(-2, -1)) - + B, C, H, W = x_freq.shape - mask = torch.ones((B, C, H, W)).cuda() + mask = torch.ones((B, C, H, W), device=x.device) - crow, ccol = H // 2, W //2 - mask[..., crow - threshold:crow + threshold, ccol - threshold:ccol + threshold] = scale + crow, ccol = H // 2, W // 2 + mask[..., crow - threshold : crow + threshold, ccol - threshold : ccol + threshold] = scale x_freq = x_freq * mask # IFFT x_freq = fft.ifftshift(x_freq, dim=(-2, -1)) x_filtered = fft.ifftn(x_freq, dim=(-2, -1)).real - - x_filtered = x_filtered.type(dtype) - return x_filtered + + return x_filtered.to(dtype=x_in.dtype) def register_upblock2d(model): + """ + Register UpBlock2D for UNet2DCondition. + """ + def up_forward(self): - def forward(hidden_states, res_hidden_states_tuple, temb=None, upsample_size=None): + def forward( + hidden_states, + res_hidden_states_tuple, + temb=None, + upsample_size=None, + scale: float = 1.0 + ): + logger2d.debug(f"in upblock2d, hidden states shape: {hidden_states.shape}") + for resnet in self.resnets: # pop res hidden states res_hidden_states = res_hidden_states_tuple[-1] res_hidden_states_tuple = res_hidden_states_tuple[:-1] - #print(f"in upblock2d, hidden states shape: {hidden_states.shape}") + hidden_states = torch.cat([hidden_states, res_hidden_states], dim=1) if self.training and self.gradient_checkpointing: @@ -69,11 +92,11 @@ def custom_forward(*inputs): create_custom_forward(resnet), hidden_states, temb ) else: - hidden_states = resnet(hidden_states, temb) + hidden_states = resnet(hidden_states, temb, scale=scale) if self.upsamplers is not None: for upsampler in self.upsamplers: - hidden_states = upsampler(hidden_states, upsample_size) + hidden_states = upsampler(hidden_states, upsample_size, scale=scale) return hidden_states @@ -85,13 +108,25 @@ def custom_forward(*inputs): def register_free_upblock2d(model, b1=1.2, b2=1.4, s1=0.9, s2=0.2): + """ + Register UpBlock2D with FreeU for UNet2DCondition. + """ + def up_forward(self): - def forward(hidden_states, res_hidden_states_tuple, temb=None, upsample_size=None): + def forward( + hidden_states, + res_hidden_states_tuple, + temb=None, + upsample_size=None, + scale: float = 1.0 + ): + logger2d.debug(f"in free upblock2d, hidden states shape: {hidden_states.shape}") + for resnet in self.resnets: # pop res hidden states res_hidden_states = res_hidden_states_tuple[-1] res_hidden_states_tuple = res_hidden_states_tuple[:-1] - #print(f"in free upblock2d, hidden states shape: {hidden_states.shape}") + # --------------- FreeU code ----------------------- # Only operate on the first two stages if hidden_states.shape[1] == 1280: @@ -121,11 +156,11 @@ def custom_forward(*inputs): create_custom_forward(resnet), hidden_states, temb ) else: - hidden_states = resnet(hidden_states, temb) + hidden_states = resnet(hidden_states, temb, scale=scale) if self.upsamplers is not None: for upsampler in self.upsamplers: - hidden_states = upsampler(hidden_states, upsample_size) + hidden_states = upsampler(hidden_states, upsample_size, scale=scale) return hidden_states @@ -141,6 +176,10 @@ def custom_forward(*inputs): def register_crossattn_upblock2d(model): + """ + Register CrossAttn UpBlock2D for UNet2DCondition. + """ + def up_forward(self): def forward( hidden_states: torch.FloatTensor, @@ -152,9 +191,12 @@ def forward( attention_mask: Optional[torch.FloatTensor] = None, encoder_attention_mask: Optional[torch.FloatTensor] = None, ): + logger2d.debug(f"in crossatten upblock2d, hidden states shape: {hidden_states.shape}") + + lora_scale = cross_attention_kwargs.get("scale", 1.0) if cross_attention_kwargs is not None else 1.0 + for resnet, attn in zip(self.resnets, self.attentions): # pop res hidden states - #print(f"in crossatten upblock2d, hidden states shape: {hidden_states.shape}") res_hidden_states = res_hidden_states_tuple[-1] res_hidden_states_tuple = res_hidden_states_tuple[:-1] hidden_states = torch.cat([hidden_states, res_hidden_states], dim=1) @@ -189,7 +231,7 @@ def custom_forward(*inputs): **ckpt_kwargs, )[0] else: - hidden_states = resnet(hidden_states, temb) + hidden_states = resnet(hidden_states, temb, scale=lora_scale) hidden_states = attn( hidden_states, encoder_hidden_states=encoder_hidden_states, @@ -201,7 +243,7 @@ def custom_forward(*inputs): if self.upsamplers is not None: for upsampler in self.upsamplers: - hidden_states = upsampler(hidden_states, upsample_size) + hidden_states = upsampler(hidden_states, upsample_size, scale=lora_scale) return hidden_states @@ -213,6 +255,10 @@ def custom_forward(*inputs): def register_free_crossattn_upblock2d(model, b1=1.2, b2=1.4, s1=0.9, s2=0.2): + """ + Register CrossAttn UpBlock2D with FreeU for UNet2DCondition. + """ + def up_forward(self): def forward( hidden_states: torch.FloatTensor, @@ -224,9 +270,12 @@ def forward( attention_mask: Optional[torch.FloatTensor] = None, encoder_attention_mask: Optional[torch.FloatTensor] = None, ): + logger2d.debug(f"in free crossatten upblock2d, hidden states shape: {hidden_states.shape}") + + lora_scale = cross_attention_kwargs.get("scale", 1.0) if cross_attention_kwargs is not None else 1.0 + for resnet, attn in zip(self.resnets, self.attentions): # pop res hidden states - #print(f"in free crossatten upblock2d, hidden states shape: {hidden_states.shape}") res_hidden_states = res_hidden_states_tuple[-1] res_hidden_states_tuple = res_hidden_states_tuple[:-1] @@ -272,7 +321,7 @@ def custom_forward(*inputs): **ckpt_kwargs, )[0] else: - hidden_states = resnet(hidden_states, temb) + hidden_states = resnet(hidden_states, temb, scale=lora_scale) hidden_states = attn( hidden_states, encoder_hidden_states=encoder_hidden_states, @@ -284,7 +333,7 @@ def custom_forward(*inputs): if self.upsamplers is not None: for upsampler in self.upsamplers: - hidden_states = upsampler(hidden_states, upsample_size) + hidden_states = upsampler(hidden_states, upsample_size, scale=lora_scale) return hidden_states @@ -292,6 +341,216 @@ def custom_forward(*inputs): for i, upsample_block in enumerate(model.unet.up_blocks): if isinstance_str(upsample_block, "CrossAttnUpBlock2D"): + upsample_block.forward = up_forward(upsample_block) + setattr(upsample_block, 'b1', b1) + setattr(upsample_block, 'b2', b2) + setattr(upsample_block, 's1', s1) + setattr(upsample_block, 's2', s2) + + +def register_upblock3d(model): + """ + Register UpBlock3D for UNet3DCondition. + """ + + def up_forward(self): + def forward( + hidden_states, + res_hidden_states_tuple, + temb=None, + upsample_size=None, + num_frames=1 + ): + + logger3d.debug(f"in upblock3d, hidden states shape: {hidden_states.shape}") + + for resnet, temp_conv in zip(self.resnets, self.temp_convs): + # pop res hidden states + res_hidden_states = res_hidden_states_tuple[-1] + res_hidden_states_tuple = res_hidden_states_tuple[:-1] + + hidden_states = torch.cat([hidden_states, res_hidden_states], dim=1) + + hidden_states = resnet(hidden_states, temb) + hidden_states = temp_conv(hidden_states, num_frames=num_frames) + + if self.upsamplers is not None: + for upsampler in self.upsamplers: + hidden_states = upsampler(hidden_states, upsample_size) + + return hidden_states + + return forward + + for i, upsample_block in enumerate(model.unet.up_blocks): + if isinstance_str(upsample_block, "UpBlock3D"): + upsample_block.forward = up_forward(upsample_block) + + +def register_free_upblock3d(model, b1=1.2, b2=1.4, s1=0.9, s2=0.2): + """ + Register UpBlock3D with FreeU for UNet3DCondition. + """ + + def up_forward(self): + def forward( + hidden_states, + res_hidden_states_tuple, + temb=None, + upsample_size=None, + num_frames=1 + ): + + logger3d.debug(f"in free upblock3d, hidden states shape: {hidden_states.shape}") + + for resnet, temp_conv in zip(self.resnets, self.temp_convs): + # pop res hidden states + res_hidden_states = res_hidden_states_tuple[-1] + res_hidden_states_tuple = res_hidden_states_tuple[:-1] + + # --------------- FreeU code ----------------------- + # Only operate on the first two stages + if hidden_states.shape[1] == 1280: + hidden_states[:,:640] = hidden_states[:,:640] * self.b1 + res_hidden_states = Fourier_filter(res_hidden_states, threshold=1, scale=self.s1) + if hidden_states.shape[1] == 640: + hidden_states[:,:320] = hidden_states[:,:320] * self.b2 + res_hidden_states = Fourier_filter(res_hidden_states, threshold=1, scale=self.s2) + # --------------------------------------------------------- + + hidden_states = torch.cat([hidden_states, res_hidden_states], dim=1) + + hidden_states = resnet(hidden_states, temb) + hidden_states = temp_conv(hidden_states, num_frames=num_frames) + + if self.upsamplers is not None: + for upsampler in self.upsamplers: + hidden_states = upsampler(hidden_states, upsample_size) + + return hidden_states + + return forward + + for i, upsample_block in enumerate(model.unet.up_blocks): + if isinstance_str(upsample_block, "UpBlock3D"): + upsample_block.forward = up_forward(upsample_block) + setattr(upsample_block, 'b1', b1) + setattr(upsample_block, 'b2', b2) + setattr(upsample_block, 's1', s1) + setattr(upsample_block, 's2', s2) + + +def register_crossattn_upblock3d(model): + """ + Register CrossAttn UpBlock3D for UNet3DCondition. + """ + + def up_forward(self): + def forward( + hidden_states: torch.FloatTensor, + res_hidden_states_tuple: Tuple[torch.FloatTensor, ...], + temb: Optional[torch.FloatTensor] = None, + encoder_hidden_states: Optional[torch.FloatTensor] = None, + cross_attention_kwargs: Optional[Dict[str, Any]] = None, + upsample_size: Optional[int] = None, + attention_mask: Optional[torch.FloatTensor] = None, + num_frames: int = 1 + ): + logger3d.debug(f"in crossatten upblock3d, hidden states shape: {hidden_states.shape}") + + for resnet, temp_conv, attn, temp_attn in zip( + self.resnets, self.temp_convs, self.attentions, self.temp_attentions + ): + # pop res hidden states + res_hidden_states = res_hidden_states_tuple[-1] + res_hidden_states_tuple = res_hidden_states_tuple[:-1] + + hidden_states = torch.cat([hidden_states, res_hidden_states], dim=1) + + hidden_states = resnet(hidden_states, temb) + hidden_states = temp_conv(hidden_states, num_frames=num_frames) + hidden_states = attn( + hidden_states, + encoder_hidden_states=encoder_hidden_states, + cross_attention_kwargs=cross_attention_kwargs, + return_dict=False, + )[0] + hidden_states = temp_attn( + hidden_states, num_frames=num_frames, cross_attention_kwargs=cross_attention_kwargs, return_dict=False + )[0] + + if self.upsamplers is not None: + for upsampler in self.upsamplers: + hidden_states = upsampler(hidden_states, upsample_size) + + return hidden_states + + return forward + + for i, upsample_block in enumerate(model.unet.up_blocks): + if isinstance_str(upsample_block, "CrossAttnUpBlock3D"): + upsample_block.forward = up_forward(upsample_block) + + +def register_free_crossattn_upblock3d(model, b1=1.2, b2=1.4, s1=0.9, s2=0.2): + """ + Register CrossAttn UpBlock3D with FreeU for UNet3DCondition. + """ + + def up_forward(self): + def forward( + hidden_states: torch.FloatTensor, + res_hidden_states_tuple: Tuple[torch.FloatTensor, ...], + temb: Optional[torch.FloatTensor] = None, + encoder_hidden_states: Optional[torch.FloatTensor] = None, + cross_attention_kwargs: Optional[Dict[str, Any]] = None, + upsample_size: Optional[int] = None, + attention_mask: Optional[torch.FloatTensor] = None, + num_frames: int = 1 + ): + logger3d.debug(f"in free crossatten upblock3d, hidden states shape: {hidden_states.shape}") + + for resnet, temp_conv, attn, temp_attn in zip( + self.resnets, self.temp_convs, self.attentions, self.temp_attentions + ): + # pop res hidden states + res_hidden_states = res_hidden_states_tuple[-1] + res_hidden_states_tuple = res_hidden_states_tuple[:-1] + + # --------------- FreeU code ----------------------- + # Only operate on the first two stages + if hidden_states.shape[1] == 1280: + hidden_states[:,:640] = hidden_states[:,:640] * self.b1 + res_hidden_states = Fourier_filter(res_hidden_states, threshold=1, scale=self.s1) + if hidden_states.shape[1] == 640: + hidden_states[:,:320] = hidden_states[:,:320] * self.b2 + res_hidden_states = Fourier_filter(res_hidden_states, threshold=1, scale=self.s2) + # --------------------------------------------------------- + + hidden_states = torch.cat([hidden_states, res_hidden_states], dim=1) + + hidden_states = resnet(hidden_states, temb) + hidden_states = temp_conv(hidden_states, num_frames=num_frames) + hidden_states = attn( + hidden_states, + encoder_hidden_states=encoder_hidden_states, + cross_attention_kwargs=cross_attention_kwargs, + return_dict=False, + )[0] + hidden_states = temp_attn( + hidden_states, num_frames=num_frames, cross_attention_kwargs=cross_attention_kwargs, return_dict=False + )[0] + + if self.upsamplers is not None: + for upsampler in self.upsamplers: + hidden_states = upsampler(hidden_states, upsample_size) + + return hidden_states + + return forward + + for i, upsample_block in enumerate(model.unet.up_blocks): + if isinstance_str(upsample_block, "CrossAttnUpBlock3D"): upsample_block.forward = up_forward(upsample_block) setattr(upsample_block, 'b1', b1) setattr(upsample_block, 'b2', b2)