pip install segmentation-models-pytorch -i https://pypi.tuna.tsinghua.edu.cn/simple
import segmentation_models_pytorch as smp
model = smp.Unet(
encoder_name="resnet34", # choose encoder, e.g. mobilenet_v2 or efficientnet-b7
encoder_weights="imagenet", # use `imagenet` pre-trained weights for encoder initialization
in_channels=1, # model input channels (1 for gray-scale images, 3 for RGB, etc.)
classes=3, # model output channels (number of classes in your dataset)
)
from segmentation_models_pytorch.encoders import get_preprocessing_fn
preprocess_input = get_preprocessing_fn('resnet18', pretrained='imagenet')
import segmentation_models_pytorch as smp
# lets assume we have multilabel prediction for 3 classes
output = torch.rand([10, 3, 256, 256])
target = torch.rand([10, 3, 256, 256]).round().long()
# first compute statistics for true positives, false positives, false negative and
# true negative "pixels"
tp, fp, fn, tn = smp.metrics.get_stats(output, target, mode='multilabel', threshold=0.5)
# then compute metrics with required reduction (see metric docs)
iou_score = smp.metrics.iou_score(tp, fp, fn, tn, reduction="micro")
f1_score = smp.metrics.f1_score(tp, fp, fn, tn, reduction="micro")
f2_score = smp.metrics.fbeta_score(tp, fp, fn, tn, beta=2, reduction="micro")
accuracy = smp.metrics.accuracy(tp, fp, fn, tn, reduction="macro")
recall = smp.metrics.recall(tp, fp, fn, tn, reduction="micro-imagewise")
def load_img(path):
img = cv2.imread(path, cv2.IMREAD_UNCHANGED)
img = np.tile(img[...,None], [1, 1, 3]) # gray to rgb
img = img.astype('float32') # original is uint16
mx = np.max(img)
if mx:
img/=mx # scale image to [0, 1]
return img
def load_msk(path):
msk = cv2.imread(path, cv2.IMREAD_UNCHANGED)
msk = msk.astype('float32')
msk/=255.0
return msk
def build_model(backbone, num_classes, device):
model = smp.Unet(
encoder_name=backbone, # choose encoder, e.g. mobilenet_v2 or efficientnet-b7
encoder_weights="imagenet", # use `imagenet` pre-trained weights for encoder initialization
in_channels=3, # model input channels (1 for gray-scale images, 3 for RGB, etc.)
classes=num_classes, # model output channels (number of classes in your dataset)
activation=None,
)
model.to(device)
return model
model = build_model(CFG.backbone, CFG.num_classes, CFG.device)
DiceLoss = smp.losses.DiceLoss(mode='binary')
BCELoss = smp.losses.SoftBCEWithLogitsLoss()
def criterion(y_pred, y_true):
if CFG.loss_func == "DiceLoss":
return DiceLoss(y_pred, y_true)
elif CFG.loss_func == "BCELoss":
y_true = y_true.unsqueeze(1)
return BCELoss(y_pred, y_true)
def dice_coef(y_true, y_pred, thr=0.5, dim=(2,3), epsilon=0.001):
y_true = y_true.unsqueeze(1).to(torch.float32)
y_pred = (y_pred>thr).to(torch.float32)
inter = (y_true*y_pred).sum(dim=dim)
den = y_true.sum(dim=dim) + y_pred.sum(dim=dim)
dice = ((2*inter+epsilon)/(den+epsilon)).mean(dim=(1,0))
return dice
def iou_coef(y_true, y_pred, thr=0.5, dim=(2,3), epsilon=0.001):
y_true = y_true.unsqueeze(1).to(torch.float32)
y_pred = (y_pred>thr).to(torch.float32)
inter = (y_true*y_pred).sum(dim=dim)
union = (y_true + y_pred - y_true*y_pred).sum(dim=dim)
iou = ((inter+epsilon)/(union+epsilon)).mean(dim=(1,0))
return iou
def fetch_scheduler(optimizer):
if CFG.scheduler == 'CosineAnnealingLR':
scheduler = lr_scheduler.CosineAnnealingLR(optimizer,T_max=CFG.T_max,
eta_min=CFG.min_lr)
elif CFG.scheduler == 'CosineAnnealingWarmRestarts':
scheduler = lr_scheduler.CosineAnnealingWarmRestarts(optimizer,T_0=CFG.T_0,
eta_min=CFG.min_lr)
elif CFG.scheduler == 'ReduceLROnPlateau':
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer,
mode='min',
factor=0.1,
patience=7,
threshold=0.0001,
min_lr=CFG.min_lr,)
elif CFG.scheduer == 'ExponentialLR':
scheduler = lr_scheduler.ExponentialLR(optimizer, gamma=0.85)
elif CFG.scheduler == None:
return None
return scheduler
optimizer = optim.Adam(model.parameters(), lr=CFG.lr, weight_decay=CFG.wd)
scheduler = fetch_scheduler(optimizer)
def train_one_epoch(model, optimizer, scheduler, dataloader, device, epoch):
model.train()
scaler = amp.GradScaler()
dataset_size = 0
running_loss = 0.0
pbar = tqdm(enumerate(dataloader), total=len(dataloader), desc='Train ')
for step, (images, masks) in pbar:
images = images.to(device, dtype=torch.float)
masks = masks.to(device, dtype=torch.float)
batch_size = images.size(0)
with amp.autocast(enabled=True):
y_pred = model(images)
loss = criterion(y_pred, masks)
loss = loss / CFG.n_accumulate
scaler.scale(loss).backward()
if (step + 1) % CFG.n_accumulate == 0:
scaler.step(optimizer)
scaler.update()
# zero the parameter gradients
optimizer.zero_grad()
if scheduler is not None:
scheduler.step()
running_loss += (loss.item() * batch_size)
dataset_size += batch_size
epoch_loss = running_loss / dataset_size
mem = torch.cuda.memory_reserved() / 1E9 if torch.cuda.is_available() else 0
current_lr = optimizer.param_groups[0]['lr']
pbar.set_postfix( epoch=f'{epoch}',
train_loss=f'{epoch_loss:0.4f}',
lr=f'{current_lr:0.5f}',
gpu_mem=f'{mem:0.2f} GB')
torch.cuda.empty_cache()
gc.collect()
return epoch_loss
@torch.no_grad()
def valid_one_epoch(model, dataloader, device, epoch):
model.eval()
dataset_size = 0
running_loss = 0.0
val_scores = []
pbar = tqdm(enumerate(dataloader), total=len(dataloader), desc='Valid ')
for step, (images, masks) in pbar:
images = images.to(device, dtype=torch.float)
masks = masks.to(device, dtype=torch.float)
batch_size = images.size(0)
y_pred = model(images)
loss = criterion(y_pred, masks)
running_loss += (loss.item() * batch_size)
dataset_size += batch_size
epoch_loss = running_loss / dataset_size
y_pred = nn.Sigmoid()(y_pred)
val_dice = dice_coef(masks, y_pred).cpu().detach().numpy()
val_jaccard = iou_coef(masks, y_pred).cpu().detach().numpy()
val_scores.append([val_dice, val_jaccard])
mem = torch.cuda.memory_reserved() / 1E9 if torch.cuda.is_available() else 0
current_lr = optimizer.param_groups[0]['lr']
pbar.set_postfix(valid_loss=f'{epoch_loss:0.4f}',
lr=f'{current_lr:0.5f}',
gpu_memory=f'{mem:0.2f} GB')
val_scores = np.mean(val_scores, axis=0)
torch.cuda.empty_cache()
gc.collect()
return epoch_loss, val_scores
def run_training(model, optimizer, scheduler, device, num_epochs):
if torch.cuda.is_available():
print("cuda: {}\n".format(torch.cuda.get_device_name()))
start = time.time()
best_model_wts = copy.deepcopy(model.state_dict())
best_loss = np.inf
best_epoch = -1
history = defaultdict(list)
for epoch in range(1, num_epochs + 1):
gc.collect()
print(f'Epoch {epoch}/{num_epochs}', end='')
train_loss = train_one_epoch(model, optimizer, scheduler,
dataloader=train_loader,
device=CFG.device, epoch=epoch)
val_loss, val_scores = valid_one_epoch(model, valid_loader,
device=CFG.device,
epoch=epoch)
val_dice, val_jaccard = val_scores
history['Train Loss'].append(train_loss)
history['Valid Loss'].append(val_loss)
history['Valid Dice'].append(val_dice)
history['Valid Jaccard'].append(val_jaccard)
print(f'Valid Dice: {val_dice:0.4f} | Valid Jaccard: {val_jaccard:0.4f}')
print(f'Valid Loss: {val_loss}')
# deep copy the model
if val_loss <= best_loss:
print(f"{c_}Valid loss Improved ({best_loss} ---> {val_loss})")
best_dice = val_dice
best_jaccard = val_jaccard
best_loss = val_loss
best_epoch = epoch
best_model_wts = copy.deepcopy(model.state_dict())
PATH = "best_epoch.bin"
torch.save(model.state_dict(), PATH)
print(f"Model Saved{sr_}")
last_model_wts = copy.deepcopy(model.state_dict())
PATH = "last_epoch.bin"
torch.save(model.state_dict(), PATH)
print(); print()
end = time.time()
time_elapsed = end - start
print('Training complete in {:.0f}h {:.0f}m {:.0f}s'.format(
time_elapsed // 3600, (time_elapsed % 3600) // 60, (time_elapsed % 3600) % 60))
print("Best Loss: {:.4f}".format(best_loss))
# load best model weights
model.load_state_dict(best_model_wts)
return model, history
model, history = run_training(model, optimizer, scheduler,
device=CFG.device,
num_epochs=CFG.epochs)
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- sceh.cn 版权所有 湘ICP备2023017654号-4
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务